agent_patch
runs/2026-02-28__13-38-10__gpt-5-1-codex-mini/flux-pr-1155/flux-pr-1155.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/agent-logs/agent.patch
22722 bytes
diff --git a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go b/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
index 1d45769..b002c91 100644
--- a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
+++ b/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
@@ -51,6 +51,12 @@ var (
}
)
+// GRPCClientFactory is a callback that supplies a gRPC client connection on demand.
+type GRPCClientFactory func(context.Context) (grpc.ClientConnInterface, error)
+
+// FactoryOption configures the GraphQL datasource factory.
+type FactoryOption func(*Factory[Configuration])
+
type Planner[T Configuration] struct {
id int
debug bool
@@ -87,7 +93,11 @@ type Planner[T Configuration] struct {
minifier *astminify.Minifier
// gRPC
- grpcClient grpc.ClientConnInterface
+ grpcClient grpc.ClientConnInterface
+ grpcClientFactory GRPCClientFactory
+ grpcDisabled bool
+ grpcDisabledMessage string
+ executionContext context.Context
}
func (p *Planner[T]) EnableSubgraphRequestMinifier() {
@@ -354,26 +364,35 @@ func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
dataSource = &Source{httpClient: p.fetchClient}
if p.config.grpc != nil {
- var err error
+ if p.grpcDisabled {
+ dataSource = newDisabledGRPCDataSource(p.grpcDisabledMessage)
+ } else {
+ opDocument, opReport := astparser.ParseGraphqlDocumentBytes(operation)
+ if opReport.HasErrors() {
+ p.stopWithError(errors.WithStack(fmt.Errorf("failed to parse operation: %w", opReport)))
+ return resolve.FetchConfiguration{}
+ }
- opDocument, opReport := astparser.ParseGraphqlDocumentBytes(operation)
- if opReport.HasErrors() {
- p.stopWithError(errors.WithStack(fmt.Errorf("failed to parse operation: %w", opReport)))
- return resolve.FetchConfiguration{}
- }
+ client, clientErr := p.getGRPCClient()
+ if clientErr != nil {
+ p.stopWithError(errors.WithStack(fmt.Errorf("failed to resolve gRPC client: %w", clientErr)))
+ return resolve.FetchConfiguration{}
+ }
- dataSource, err = grpcdatasource.NewDataSource(p.grpcClient, grpcdatasource.DataSourceConfig{
- Operation: &opDocument,
- Definition: p.config.schemaConfiguration.upstreamSchemaAst,
- Mapping: p.config.grpc.Mapping,
- Compiler: p.config.grpc.Compiler,
- // TODO: remove fallback logic in visitor for subgraph name and
- // add proper error handling if the subgraph name is not set in the mapping
- SubgraphName: p.dataSourceConfig.Name(),
- })
- if err != nil {
- p.stopWithError(errors.WithStack(fmt.Errorf("failed to create gRPC datasource: %w", err)))
- return resolve.FetchConfiguration{}
+ var err error
+ dataSource, err = grpcdatasource.NewDataSource(client, grpcdatasource.DataSourceConfig{
+ Operation: &opDocument,
+ Definition: p.config.schemaConfiguration.upstreamSchemaAst,
+ Mapping: p.config.grpc.Mapping,
+ Compiler: p.config.grpc.Compiler,
+ // TODO: remove fallback logic in visitor for subgraph name and
+ // add proper error handling if the subgraph name is not set in the mapping
+ SubgraphName: p.dataSourceConfig.Name(),
+ })
+ if err != nil {
+ p.stopWithError(errors.WithStack(fmt.Errorf("failed to create gRPC datasource: %w", err)))
+ return resolve.FetchConfiguration{}
+ }
}
}
@@ -389,6 +408,27 @@ func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
}
}
+func (p *Planner[T]) getGRPCClient() (grpc.ClientConnInterface, error) {
+ if p.grpcClient != nil {
+ return p.grpcClient, nil
+ }
+
+ if p.grpcClientFactory != nil {
+ client, err := p.grpcClientFactory(p.executionContext)
+ if err != nil {
+ return nil, err
+ }
+
+ if client == nil {
+ return nil, fmt.Errorf("grpc client factory returned nil client")
+ }
+
+ return client, nil
+ }
+
+ return nil, fmt.Errorf("grpc client is required")
+}
+
func (p *Planner[T]) shouldSelectSingleEntity() bool {
return p.dataSourcePlannerConfig.HasRequiredFields() &&
p.dataSourcePlannerConfig.PathType == plan.PlannerPathObject
@@ -1701,10 +1741,13 @@ var (
)
type Factory[T Configuration] struct {
- executionContext context.Context
- httpClient *http.Client
- grpcClient grpc.ClientConnInterface
- subscriptionClient GraphQLSubscriptionClient
+ executionContext context.Context
+ httpClient *http.Client
+ grpcClient grpc.ClientConnInterface
+ grpcClientFactory GRPCClientFactory
+ grpcDisabled bool
+ grpcDisabledMessage string
+ subscriptionClient GraphQLSubscriptionClient
}
// NewFactory (HTTP) creates a new factory for the GraphQL datasource planner
@@ -1731,19 +1774,46 @@ func NewFactory(executionContext context.Context, httpClient *http.Client, subsc
// NewFactory (GRPC) creates a new factory for the GraphQL datasource planner
// Graphql Datasource could be stateful in case you are using subscriptions,
// make sure you are using the same execution context for all datasources
-func NewFactoryGRPC(executionContext context.Context, grpcClient grpc.ClientConnInterface) (*Factory[Configuration], error) {
+func NewFactoryGRPC(executionContext context.Context, grpcClient grpc.ClientConnInterface, opts ...FactoryOption) (*Factory[Configuration], error) {
if executionContext == nil {
return nil, fmt.Errorf("execution context is required")
}
- if grpcClient == nil {
+ factory := &Factory[Configuration]{
+ executionContext: executionContext,
+ grpcClient: grpcClient,
+ }
+
+ for _, opt := range opts {
+ opt(factory)
+ }
+
+ if factory.grpcDisabled && factory.grpcDisabledMessage == "" {
+ factory.grpcDisabledMessage = "gRPC datasource is disabled"
+ }
+
+ if !factory.grpcDisabled && factory.grpcClient == nil && factory.grpcClientFactory == nil {
return nil, fmt.Errorf("grpc client is required")
}
- return &Factory[Configuration]{
- executionContext: executionContext,
- grpcClient: grpcClient,
- }, nil
+ return factory, nil
+}
+
+// WithGRPCClientFactory sets a callback to provide a gRPC client on demand.
+func WithGRPCClientFactory(factory GRPCClientFactory) FactoryOption {
+ return func(o *Factory[Configuration]) {
+ o.grpcClientFactory = factory
+ }
+}
+
+// WithGRPCDataSourceDisabled marks the gRPC datasource as disabled.
+func WithGRPCDataSourceDisabled(message string) FactoryOption {
+ return func(o *Factory[Configuration]) {
+ o.grpcDisabled = true
+ if message != "" {
+ o.grpcDisabledMessage = message
+ }
+ }
}
func (p *Planner[T]) getKit() *printKit {
@@ -1758,9 +1828,13 @@ func (p *Planner[T]) releaseKit(kit *printKit) {
func (f *Factory[T]) Planner(logger abstractlogger.Logger) plan.DataSourcePlanner[T] {
return &Planner[T]{
- fetchClient: f.httpClient,
- grpcClient: f.grpcClient,
- subscriptionClient: f.subscriptionClient,
+ fetchClient: f.httpClient,
+ grpcClient: f.grpcClient,
+ grpcClientFactory: f.grpcClientFactory,
+ grpcDisabled: f.grpcDisabled,
+ grpcDisabledMessage: f.grpcDisabledMessage,
+ executionContext: f.executionContext,
+ subscriptionClient: f.subscriptionClient,
}
}
@@ -1869,6 +1943,28 @@ func (s *Source) Load(ctx context.Context, input []byte, out *bytes.Buffer) (err
return httpclient.Do(s.httpClient, ctx, input, out)
}
+type disabledGRPCDataSource struct {
+ err error
+}
+
+func newDisabledGRPCDataSource(message string) resolve.DataSource {
+ if message == "" {
+ message = "gRPC datasource is disabled"
+ }
+
+ return &disabledGRPCDataSource{
+ err: errors.New(message),
+ }
+}
+
+func (d *disabledGRPCDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) (err error) {
+ return d.err
+}
+
+func (d *disabledGRPCDataSource) LoadWithFiles(ctx context.Context, input []byte, files []*httpclient.FileUpload, out *bytes.Buffer) (err error) {
+ return d.err
+}
+
type GraphQLSubscriptionClient interface {
// Subscribe to the origin source. The implementation must not block the calling goroutine.
Subscribe(ctx *resolve.Context, options GraphQLSubscriptionOptions, updater resolve.SubscriptionUpdater) error
diff --git a/app/v2/pkg/engine/datasource/graphql_datasource/grpc_factory_test.go b/app/v2/pkg/engine/datasource/graphql_datasource/grpc_factory_test.go
new file mode 100644
index 0000000..300c8fe
--- /dev/null
+++ b/app/v2/pkg/engine/datasource/graphql_datasource/grpc_factory_test.go
@@ -0,0 +1,44 @@
+package graphql_datasource
+
+import (
+ "context"
+ "testing"
+
+ "github.com/jensneuse/abstractlogger"
+ "github.com/stretchr/testify/require"
+ "google.golang.org/grpc"
+)
+
+type stubGRPCClient struct{}
+
+func (stubGRPCClient) Invoke(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error {
+ return nil
+}
+
+func (stubGRPCClient) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) {
+ return nil, nil
+}
+
+func TestNewFactoryGRPCRequiresClient(t *testing.T) {
+ _, err := NewFactoryGRPC(context.Background(), nil)
+ require.Error(t, err)
+}
+
+func TestNewFactoryGRPCWithClientFactory(t *testing.T) {
+ factory, err := NewFactoryGRPC(context.Background(), nil, WithGRPCClientFactory(func(ctx context.Context) (grpc.ClientConnInterface, error) {
+ return stubGRPCClient{}, nil
+ }))
+ require.NoError(t, err)
+
+ planner := factory.Planner(abstractlogger.Noop{}).(*Planner[Configuration])
+ client, err := planner.getGRPCClient()
+ require.NoError(t, err)
+ require.IsType(t, stubGRPCClient{}, client)
+}
+
+func TestNewFactoryGRPCDisabled(t *testing.T) {
+ factory, err := NewFactoryGRPC(context.Background(), nil, WithGRPCDataSourceDisabled("off"))
+ require.NoError(t, err)
+ require.True(t, factory.grpcDisabled)
+ require.Equal(t, "off", factory.grpcDisabledMessage)
+}
diff --git a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go b/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go
index ed8b887..1da7c1c 100644
--- a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go
@@ -407,9 +407,37 @@ func (p *RPCCompiler) buildProtoMessage(inputMessage Message, rpcMessage *RPCMes
}
// Process each element and append to the list
+ if field.MessageRef >= 0 && rpcField != nil && rpcField.Message != nil {
+ for _, element := range elements {
+ fieldMsg := p.buildProtoMessage(p.doc.Messages[field.MessageRef], rpcField.Message, element)
+ if fieldMsg == nil {
+ continue
+ }
+ list.Append(protoref.ValueOfMessage(fieldMsg))
+ }
+ continue
+ }
+
+ if field.Type == DataTypeEnum {
+ enum, ok := p.doc.EnumByName(rpcField.EnumName)
+ if !ok {
+ p.report.AddInternalError(fmt.Errorf("enum %s not found in document", rpcField.EnumName))
+ continue
+ }
+
+ for _, element := range elements {
+ for _, enumValue := range enum.Values {
+ if enumValue.GraphqlValue == element.String() {
+ list.Append(protoref.ValueOfEnum(protoref.EnumNumber(enumValue.Number)))
+ break
+ }
+ }
+ }
+ continue
+ }
+
for _, element := range elements {
- fieldMsg := p.buildProtoMessage(p.doc.Messages[field.MessageRef], rpcField.Message, element)
- list.Append(protoref.ValueOfMessage(fieldMsg))
+ list.Append(p.setValueForKind(field.Type, element))
}
continue
diff --git a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/compiler_test.go b/app/v2/pkg/engine/datasource/grpc_datasource/compiler_test.go
index 5e098c3..8ef232a 100644
--- a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/compiler_test.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/compiler_test.go
@@ -291,3 +291,76 @@ func TestBuildProtoMessage(t *testing.T) {
require.Equal(t, 1, len(invocations))
}
+
+func TestBuildProtoMessageRepeatedScalars(t *testing.T) {
+ protoSchema := `
+syntax = "proto3";
+package repeated;
+
+message ScalarListRequest {
+ repeated string tags = 1;
+ repeated int32 counts = 2;
+}
+
+message ScalarListResponse {
+ repeated string tags = 1;
+}
+
+service ScalarService {
+ rpc List(ScalarListRequest) returns (ScalarListResponse);
+}
+`
+
+ compiler, err := NewProtoCompiler(protoSchema, nil)
+ require.NoError(t, err)
+
+ executionPlan := &RPCExecutionPlan{
+ Calls: []RPCCall{
+ {
+ MethodName: "List",
+ Request: RPCMessage{
+ Name: "ScalarListRequest",
+ Fields: []RPCField{
+ {
+ Name: "tags",
+ TypeName: string(DataTypeString),
+ JSONPath: "tags",
+ Repeated: true,
+ },
+ {
+ Name: "counts",
+ TypeName: string(DataTypeInt32),
+ JSONPath: "counts",
+ Repeated: true,
+ },
+ },
+ },
+ Response: RPCMessage{Name: "ScalarListResponse"},
+ },
+ },
+ }
+
+ variables := []byte(`{"tags":["alpha","beta"],"counts":[1,2,3]}`)
+
+ invocations, err := compiler.Compile(executionPlan, gjson.ParseBytes(variables))
+ require.NoError(t, err)
+ require.Len(t, invocations, 1)
+
+ input := invocations[0].Input
+ fields := input.Descriptor().Fields()
+
+ tags := fields.ByName("tags")
+ require.NotNil(t, tags)
+ tagList := input.Get(tags).List()
+ require.Equal(t, 2, tagList.Len())
+ require.Equal(t, "alpha", tagList.Get(0).String())
+ require.Equal(t, "beta", tagList.Get(1).String())
+
+ counts := fields.ByName("counts")
+ require.NotNil(t, counts)
+ countList := input.Get(counts).List()
+ require.Equal(t, 3, countList.Len())
+ require.Equal(t, int32(1), int32(countList.Get(0).Int()))
+ require.Equal(t, int32(2), int32(countList.Get(1).Int()))
+ require.Equal(t, int32(3), int32(countList.Get(2).Int()))
+}
diff --git a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go
index 940d501..be9521f 100644
--- a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go
@@ -17,6 +17,7 @@ import (
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"google.golang.org/grpc"
+ "google.golang.org/grpc/status"
protoref "google.golang.org/protobuf/reflect/protoreflect"
)
@@ -161,79 +162,114 @@ func (d *DataSource) marshalResponseJSON(arena *astjson.Arena, message *RPCMessa
arr := arena.NewArray()
root.Set(field.JSONPath, arr)
list := data.Get(fd).List()
+ if list == nil {
+ continue
+ }
+
for i := 0; i < list.Len(); i++ {
- message := list.Get(i).Message()
- value, err := d.marshalResponseJSON(arena, field.Message, message)
- if err != nil {
- return nil, err
+ element := list.Get(i)
+ if fd.Kind() == protoref.MessageKind && field.Message != nil {
+ if !element.IsValid() {
+ arr.SetArrayItem(i, arena.NewNull())
+ continue
+ }
+
+ msg := element.Message()
+ if msg == nil {
+ arr.SetArrayItem(i, arena.NewNull())
+ continue
+ }
+
+ value, err := d.marshalResponseJSON(arena, field.Message, msg)
+ if err != nil {
+ return nil, err
+ }
+
+ arr.SetArrayItem(i, value)
+ continue
}
- arr.SetArrayItem(i, value)
+ arr.SetArrayItem(i, d.newJSONScalarValue(arena, fd, element))
}
continue
}
if fd.Kind() == protoref.MessageKind {
- msg := data.Get(fd).Message()
- value, err := d.marshalResponseJSON(arena, field.Message, msg)
+ value := data.Get(fd)
+ if !value.IsValid() {
+ continue
+ }
+
+ msg := value.Message()
+ if msg == nil {
+ continue
+ }
+
+ marshaled, err := d.marshalResponseJSON(arena, field.Message, msg)
if err != nil {
return nil, err
}
if field.JSONPath == "" {
- root, _, err = astjson.MergeValues(root, value)
+ root, _, err = astjson.MergeValues(root, marshaled)
if err != nil {
return nil, err
}
} else {
- root.Set(field.JSONPath, value)
+ root.Set(field.JSONPath, marshaled)
}
continue
}
- d.setJSONValue(arena, root, field.JSONPath, data, fd)
+ d.setJSONValue(arena, root, field.JSONPath, data.Get(fd), fd)
}
return root, nil
}
-func (d *DataSource) setJSONValue(arena *astjson.Arena, root *astjson.Value, name string, data protoref.Message, fd protoref.FieldDescriptor) {
+func (d *DataSource) setJSONValue(arena *astjson.Arena, root *astjson.Value, name string, value protoref.Value, fd protoref.FieldDescriptor) {
+ root.Set(name, d.newJSONScalarValue(arena, fd, value))
+}
+
+func (d *DataSource) newJSONScalarValue(arena *astjson.Arena, fd protoref.FieldDescriptor, value protoref.Value) *astjson.Value {
+ if !value.IsValid() {
+ return arena.NewNull()
+ }
+
switch fd.Kind() {
case protoref.BoolKind:
- boolValue := data.Get(fd).Bool()
- if boolValue {
- root.Set(name, arena.NewTrue())
- } else {
- root.Set(name, arena.NewFalse())
+ if value.Bool() {
+ return arena.NewTrue()
}
+ return arena.NewFalse()
case protoref.StringKind:
- root.Set(name, arena.NewString(data.Get(fd).String()))
+ return arena.NewString(value.String())
case protoref.Int32Kind, protoref.Int64Kind:
- root.Set(name, arena.NewNumberInt(int(data.Get(fd).Int())))
+ return arena.NewNumberInt(int(value.Int()))
case protoref.Uint32Kind, protoref.Uint64Kind:
- root.Set(name, arena.NewNumberString(fmt.Sprintf("%d", data.Get(fd).Uint())))
+ return arena.NewNumberString(fmt.Sprintf("%d", value.Uint()))
case protoref.FloatKind, protoref.DoubleKind:
- root.Set(name, arena.NewNumberFloat64(data.Get(fd).Float()))
+ return arena.NewNumberFloat64(value.Float())
case protoref.BytesKind:
- root.Set(name, arena.NewStringBytes(data.Get(fd).Bytes()))
+ return arena.NewStringBytes(value.Bytes())
case protoref.EnumKind:
enumDesc := fd.Enum()
- enumValueDesc := enumDesc.Values().ByNumber(data.Get(fd).Enum())
+ enumValueDesc := enumDesc.Values().ByNumber(value.Enum())
if enumValueDesc == nil {
- root.Set(name, arena.NewNull())
- return
+ return arena.NewNull()
}
graphqlValue, ok := d.mapping.ResolveEnumValue(string(enumDesc.Name()), string(enumValueDesc.Name()))
if !ok {
- root.Set(name, arena.NewNull())
- return
+ return arena.NewNull()
}
- root.Set(name, arena.NewString(graphqlValue))
+ return arena.NewString(graphqlValue)
}
+
+ return arena.NewNull()
}
func writeErrorBytes(err error) []byte {
@@ -244,6 +280,15 @@ func writeErrorBytes(err error) []byte {
errorItem := a.NewObject()
errorItem.Set("message", a.NewString(err.Error()))
+
+ if st, ok := status.FromError(err); ok {
+ extensions := a.NewObject()
+ extensions.Set("code", a.NewString(st.Code().String()))
+ extensions.Set("grpcStatusCode", a.NewString(st.Code().String()))
+ extensions.Set("grpcStatusMessage", a.NewString(st.Message()))
+ errorItem.Set("extensions", extensions)
+ }
+
errorArray.SetArrayItem(0, errorItem)
return errorRoot.MarshalTo(nil)
diff --git a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_test.go b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_test.go
index a14f5a1..d4fce08 100644
--- a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_test.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_test.go
@@ -15,7 +15,9 @@ import (
"github.com/wundergraph/graphql-go-tools/v2/pkg/grpctest"
"github.com/wundergraph/graphql-go-tools/v2/pkg/grpctest/productv1"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/encoding/protojson"
protoref "google.golang.org/protobuf/reflect/protoreflect"
@@ -99,6 +101,68 @@ func Test_DataSource_Load(t *testing.T) {
fmt.Println(output.String())
}
+func TestMarshalResponseJSONRepeatedScalar(t *testing.T) {
+ protoSchema := `
+syntax = "proto3";
+package repeated;
+
+message RepeatedResponse {
+ repeated string values = 1;
+}
+`
+
+ compiler, err := NewProtoCompiler(protoSchema, nil)
+ require.NoError(t, err)
+
+ response := compiler.doc.MessageByName("RepeatedResponse")
+ require.NotNil(t, response.Desc)
+
+ message := dynamicpb.NewMessage(response.Desc)
+ fd := response.Desc.Fields().ByName("values")
+ require.NotNil(t, fd)
+
+ repeated := message.Mutable(fd).List()
+ repeated.Append(protoref.ValueOfString("hello"))
+ repeated.Append(protoref.ValueOfString("world"))
+
+ ds := &DataSource{mapping: &GRPCMapping{}}
+ arena := astjson.Arena{}
+ rpcMessage := &RPCMessage{
+ Name: response.Name,
+ Fields: []RPCField{
+ {
+ Name: "values",
+ TypeName: string(DataTypeString),
+ JSONPath: "values",
+ Repeated: true,
+ },
+ },
+ }
+
+ value, err := ds.marshalResponseJSON(&arena, rpcMessage, message)
+ require.NoError(t, err)
+ require.JSONEq(t, `{"values":["hello","world"]}`, string(value.MarshalTo(nil)))
+}
+
+func TestWriteErrorBytesIncludesGrpcStatus(t *testing.T) {
+ err := status.Error(codes.InvalidArgument, "bad request")
+ output := writeErrorBytes(err)
+
+ var parsed struct {
+ Errors []struct {
+ Message string `json:"message"`
+ Extensions map[string]string `json:"extensions"`
+ } `json:"errors"`
+ }
+ require.NoError(t, json.Unmarshal(output, &parsed))
+ require.Len(t, parsed.Errors, 1)
+ extensions := parsed.Errors[0].Extensions
+ require.NotNil(t, extensions)
+ require.Equal(t, "InvalidArgument", extensions["code"])
+ require.Equal(t, "InvalidArgument", extensions["grpcStatusCode"])
+ require.Equal(t, "bad request", extensions["grpcStatusMessage"])
+}
+
// Test_DataSource_Load_WithMockService tests the datasource.Load method with an actual gRPC server
// TODO update this test to not use mappings anc expect no response
func Test_DataSource_Load_WithMockService(t *testing.T) {