agent_patch
runs/2026-02-21__03-36-16__gpt-5-1-codex-mini/flux-pr-1155/flux-pr-1155.1-of-1.2026-03-05__17-48-26__gpt-5-1-codex-mini/agent-logs/agent.patch
19105 bytes
diff --git a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-03-05__17-48-26__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go b/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
index 1d45769..c3ba2a2 100644
--- a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-03-05__17-48-26__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
+++ b/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
@@ -87,7 +87,9 @@ type Planner[T Configuration] struct {
minifier *astminify.Minifier
// gRPC
- grpcClient grpc.ClientConnInterface
+ grpcClient grpc.ClientConnInterface
+ executionContext context.Context
+ grpcClientFactory func(context.Context) (grpc.ClientConnInterface, error)
}
func (p *Planner[T]) EnableSubgraphRequestMinifier() {
@@ -317,7 +319,8 @@ func (p *Planner[T]) createInputForQuery() (input, operation []byte) {
}
func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
- if p.config.fetch == nil && p.config.grpc == nil {
+ grpcEnabled := p.config.grpc != nil && !p.config.grpc.Disabled
+ if p.config.fetch == nil && !grpcEnabled {
p.stopWithError(errors.WithStack(errors.New("ConfigureFetch: fetch and grpc configuration is empty")))
return resolve.FetchConfiguration{}
}
@@ -353,8 +356,12 @@ func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
dataSource = &Source{httpClient: p.fetchClient}
- if p.config.grpc != nil {
- var err error
+ if grpcEnabled {
+ grpcClient, err := p.resolveGRPCClient()
+ if err != nil {
+ p.stopWithError(errors.WithStack(fmt.Errorf("failed to resolve grpc client: %w", err)))
+ return resolve.FetchConfiguration{}
+ }
opDocument, opReport := astparser.ParseGraphqlDocumentBytes(operation)
if opReport.HasErrors() {
@@ -362,7 +369,7 @@ func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
return resolve.FetchConfiguration{}
}
- dataSource, err = grpcdatasource.NewDataSource(p.grpcClient, grpcdatasource.DataSourceConfig{
+ dataSource, err = grpcdatasource.NewDataSource(grpcClient, grpcdatasource.DataSourceConfig{
Operation: &opDocument,
Definition: p.config.schemaConfiguration.upstreamSchemaAst,
Mapping: p.config.grpc.Mapping,
@@ -389,6 +396,32 @@ func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
}
}
+func (p *Planner[T]) resolveGRPCClient() (grpc.ClientConnInterface, error) {
+ if p.grpcClient != nil {
+ return p.grpcClient, nil
+ }
+
+ if p.grpcClientFactory == nil {
+ return nil, fmt.Errorf("grpc client is not configured")
+ }
+
+ ctx := p.executionContext
+ if ctx == nil {
+ ctx = context.Background()
+ }
+
+ client, err := p.grpcClientFactory(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ if client == nil {
+ return nil, fmt.Errorf("grpc client factory returned nil")
+ }
+
+ return client, nil
+}
+
func (p *Planner[T]) shouldSelectSingleEntity() bool {
return p.dataSourcePlannerConfig.HasRequiredFields() &&
p.dataSourcePlannerConfig.PathType == plan.PlannerPathObject
@@ -1704,6 +1737,7 @@ type Factory[T Configuration] struct {
executionContext context.Context
httpClient *http.Client
grpcClient grpc.ClientConnInterface
+ grpcClientFactory func(context.Context) (grpc.ClientConnInterface, error)
subscriptionClient GraphQLSubscriptionClient
}
@@ -1746,6 +1780,22 @@ func NewFactoryGRPC(executionContext context.Context, grpcClient grpc.ClientConn
}, nil
}
+// NewFactoryGRPCWithClientFactory creates a factory that resolves a gRPC client dynamically.
+func NewFactoryGRPCWithClientFactory(executionContext context.Context, grpcClientFactory func(context.Context) (grpc.ClientConnInterface, error)) (*Factory[Configuration], error) {
+ if executionContext == nil {
+ return nil, fmt.Errorf("execution context is required")
+ }
+
+ if grpcClientFactory == nil {
+ return nil, fmt.Errorf("grpc client factory is required")
+ }
+
+ return &Factory[Configuration]{
+ executionContext: executionContext,
+ grpcClientFactory: grpcClientFactory,
+ }, nil
+}
+
func (p *Planner[T]) getKit() *printKit {
return printKitPool.Get().(*printKit)
}
@@ -1761,6 +1811,8 @@ func (f *Factory[T]) Planner(logger abstractlogger.Logger) plan.DataSourcePlanne
fetchClient: f.httpClient,
grpcClient: f.grpcClient,
subscriptionClient: f.subscriptionClient,
+ executionContext: f.executionContext,
+ grpcClientFactory: f.grpcClientFactory,
}
}
diff --git a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-03-05__17-48-26__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go b/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go
index ed8b887..8eada57 100644
--- a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-03-05__17-48-26__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go
@@ -397,8 +397,11 @@ func (p *RPCCompiler) buildProtoMessage(inputMessage Message, rpcMessage *RPCMes
// Handle repeated fields (arrays/lists)
if field.Repeated {
- // Get a mutable reference to the list field
- list := message.Mutable(fd.ByName(protoref.Name(field.Name))).List()
+ listField := fd.ByName(protoref.Name(field.Name))
+ if listField == nil {
+ continue
+ }
+ list := message.Mutable(listField).List()
// Extract the array elements from the JSON data
elements := data.Get(rpcField.JSONPath).Array()
@@ -406,10 +409,26 @@ func (p *RPCCompiler) buildProtoMessage(inputMessage Message, rpcMessage *RPCMes
continue
}
- // Process each element and append to the list
- for _, element := range elements {
- fieldMsg := p.buildProtoMessage(p.doc.Messages[field.MessageRef], rpcField.Message, element)
- list.Append(protoref.ValueOfMessage(fieldMsg))
+ // Determine repetition handling based on the field type
+ switch {
+ case field.MessageRef >= 0 && rpcField.Message != nil:
+ for _, element := range elements {
+ if element.Type == gjson.Null {
+ continue
+ }
+ fieldMsg := p.buildProtoMessage(p.doc.Messages[field.MessageRef], rpcField.Message, element)
+ list.Append(protoref.ValueOfMessage(fieldMsg))
+ }
+ case field.Type == DataTypeEnum:
+ for _, element := range elements {
+ if enumValue, ok := p.enumValueForField(rpcField, element); ok {
+ list.Append(protoref.ValueOfEnum(enumValue))
+ }
+ }
+ default:
+ for _, element := range elements {
+ list.Append(p.setValueForKind(field.Type, element))
+ }
}
continue
@@ -424,21 +443,11 @@ func (p *RPCCompiler) buildProtoMessage(inputMessage Message, rpcMessage *RPCMes
}
if field.Type == DataTypeEnum {
- enum, ok := p.doc.EnumByName(rpcField.EnumName)
- if !ok {
- p.report.AddInternalError(fmt.Errorf("enum %s not found in document", rpcField.EnumName))
- continue
- }
-
- for _, enumValue := range enum.Values {
- if enumValue.GraphqlValue == data.Get(rpcField.JSONPath).String() {
- message.Set(
- fd.ByName(protoref.Name(field.Name)),
- protoref.ValueOfEnum(protoref.EnumNumber(enumValue.Number)),
- )
-
- break
- }
+ if enumValue, ok := p.enumValueForField(rpcField, data.Get(rpcField.JSONPath)); ok {
+ message.Set(
+ fd.ByName(protoref.Name(field.Name)),
+ protoref.ValueOfEnum(enumValue),
+ )
}
continue
@@ -477,6 +486,32 @@ func (p *RPCCompiler) setValueForKind(kind DataType, data gjson.Result) protoref
return protoref.Value{}
}
+func (p *RPCCompiler) enumValueForField(rpcField *RPCField, value gjson.Result) (protoref.EnumNumber, bool) {
+ if rpcField == nil || rpcField.EnumName == "" {
+ return 0, false
+ }
+
+ enum, ok := p.doc.EnumByName(rpcField.EnumName)
+ if !ok {
+ p.report.AddInternalError(fmt.Errorf("enum %s not found in document", rpcField.EnumName))
+ return 0, false
+ }
+
+ graphqlValue := value.String()
+ for _, enumValue := range enum.Values {
+ if enumValue.GraphqlValue != "" {
+ if enumValue.GraphqlValue == graphqlValue {
+ return protoref.EnumNumber(enumValue.Number), true
+ }
+ }
+ if enumValue.GraphqlValue == "" && enumValue.Name == graphqlValue {
+ return protoref.EnumNumber(enumValue.Number), true
+ }
+ }
+
+ return 0, false
+}
+
// parseEnum extracts information from a protobuf enum descriptor.
func (p *RPCCompiler) parseEnum(e protoref.EnumDescriptor, mapping *GRPCMapping) Enum {
var enumValueMappings []EnumValueMapping
diff --git a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-03-05__17-48-26__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/configuration.go b/app/v2/pkg/engine/datasource/grpc_datasource/configuration.go
index 4dbd48b..1c1fe37 100644
--- a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-03-05__17-48-26__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/configuration.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/configuration.go
@@ -32,6 +32,8 @@ type EnumValueMapping struct {
type GRPCConfiguration struct {
Mapping *GRPCMapping
Compiler *RPCCompiler
+ // Disabled prevents the gRPC datasource from being created when true
+ Disabled bool
}
// RPCConfig defines the configuration for a specific RPC operation
diff --git a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-03-05__17-48-26__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go
index 940d501..a47be81 100644
--- a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-03-05__17-48-26__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go
@@ -17,6 +17,7 @@ import (
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"google.golang.org/grpc"
+ "google.golang.org/grpc/status"
protoref "google.golang.org/protobuf/reflect/protoreflect"
)
@@ -135,12 +136,12 @@ func (d *DataSource) marshalResponseJSON(arena *astjson.Arena, message *RPCMessa
// if oneof == nil {
// return nil, fmt.Errorf("unable to build response JSON: oneof %s not found in message %s", message.Name, message.Name)
// }
-
+ //
// oneofDescriptor := data.WhichOneof(oneof)
// if oneofDescriptor == nil {
// return nil, fmt.Errorf("unable to build response JSON: oneof %s not found in message %s", message.Name, message.Name)
// }
-
+ //
// if oneofDescriptor.Kind() == protoref.MessageKind {
// data = data.Get(oneofDescriptor).Message()
// }
@@ -157,83 +158,111 @@ func (d *DataSource) marshalResponseJSON(arena *astjson.Arena, message *RPCMessa
continue
}
+ value := data.Get(fd)
+
if fd.IsList() {
arr := arena.NewArray()
root.Set(field.JSONPath, arr)
- list := data.Get(fd).List()
+ list := value.List()
for i := 0; i < list.Len(); i++ {
- message := list.Get(i).Message()
- value, err := d.marshalResponseJSON(arena, field.Message, message)
- if err != nil {
- return nil, err
+ item := list.Get(i)
+ if fd.Kind() == protoref.MessageKind && field.Message != nil {
+ msg := item.Message()
+ if msg == nil {
+ arr.SetArrayItem(i, arena.NewNull())
+ continue
+ }
+ value, err := d.marshalResponseJSON(arena, field.Message, msg)
+ if err != nil {
+ return nil, err
+ }
+ arr.SetArrayItem(i, value)
+ continue
}
-
- arr.SetArrayItem(i, value)
+ arr.SetArrayItem(i, d.scalarJSONValue(arena, item, fd))
}
-
continue
}
if fd.Kind() == protoref.MessageKind {
- msg := data.Get(fd).Message()
- value, err := d.marshalResponseJSON(arena, field.Message, msg)
+ if field.Message == nil {
+ continue
+ }
+ msg := value.Message()
+ if msg == nil {
+ continue
+ }
+ nested, err := d.marshalResponseJSON(arena, field.Message, msg)
if err != nil {
return nil, err
}
if field.JSONPath == "" {
- root, _, err = astjson.MergeValues(root, value)
+ root, _, err = astjson.MergeValues(root, nested)
if err != nil {
return nil, err
}
} else {
- root.Set(field.JSONPath, value)
+ root.Set(field.JSONPath, nested)
}
continue
}
- d.setJSONValue(arena, root, field.JSONPath, data, fd)
+ d.setJSONValue(arena, root, field.JSONPath, value, fd)
}
return root, nil
}
-func (d *DataSource) setJSONValue(arena *astjson.Arena, root *astjson.Value, name string, data protoref.Message, fd protoref.FieldDescriptor) {
+func (d *DataSource) setJSONValue(arena *astjson.Arena, root *astjson.Value, name string, value protoref.Value, fd protoref.FieldDescriptor) {
+ root.Set(name, d.scalarJSONValue(arena, value, fd))
+}
+
+func (d *DataSource) scalarJSONValue(arena *astjson.Arena, value protoref.Value, fd protoref.FieldDescriptor) *astjson.Value {
switch fd.Kind() {
case protoref.BoolKind:
- boolValue := data.Get(fd).Bool()
- if boolValue {
- root.Set(name, arena.NewTrue())
- } else {
- root.Set(name, arena.NewFalse())
+ if value.Bool() {
+ return arena.NewTrue()
}
+ return arena.NewFalse()
case protoref.StringKind:
- root.Set(name, arena.NewString(data.Get(fd).String()))
+ return arena.NewString(value.String())
case protoref.Int32Kind, protoref.Int64Kind:
- root.Set(name, arena.NewNumberInt(int(data.Get(fd).Int())))
+ return arena.NewNumberInt(int(value.Int()))
case protoref.Uint32Kind, protoref.Uint64Kind:
- root.Set(name, arena.NewNumberString(fmt.Sprintf("%d", data.Get(fd).Uint())))
+ return arena.NewNumberString(fmt.Sprintf("%d", value.Uint()))
case protoref.FloatKind, protoref.DoubleKind:
- root.Set(name, arena.NewNumberFloat64(data.Get(fd).Float()))
+ return arena.NewNumberFloat64(value.Float())
case protoref.BytesKind:
- root.Set(name, arena.NewStringBytes(data.Get(fd).Bytes()))
+ return arena.NewStringBytes(value.Bytes())
case protoref.EnumKind:
- enumDesc := fd.Enum()
- enumValueDesc := enumDesc.Values().ByNumber(data.Get(fd).Enum())
- if enumValueDesc == nil {
- root.Set(name, arena.NewNull())
- return
- }
+ return d.enumJSONValue(arena, fd.Enum(), value.Enum())
+ default:
+ return arena.NewNull()
+ }
+}
- graphqlValue, ok := d.mapping.ResolveEnumValue(string(enumDesc.Name()), string(enumValueDesc.Name()))
- if !ok {
- root.Set(name, arena.NewNull())
- return
- }
+func (d *DataSource) enumJSONValue(arena *astjson.Arena, enumDesc protoref.EnumDescriptor, enumNumber protoref.EnumNumber) *astjson.Value {
+ if enumDesc == nil {
+ return arena.NewNull()
+ }
- root.Set(name, arena.NewString(graphqlValue))
+ enumValueDesc := enumDesc.Values().ByNumber(enumNumber)
+ if enumValueDesc == nil {
+ return arena.NewNull()
}
+
+ if d.mapping == nil {
+ return arena.NewNull()
+ }
+
+ graphqlValue, ok := d.mapping.ResolveEnumValue(string(enumDesc.Name()), string(enumValueDesc.Name()))
+ if !ok {
+ return arena.NewNull()
+ }
+
+ return arena.NewString(graphqlValue)
}
func writeErrorBytes(err error) []byte {
@@ -244,6 +273,13 @@ func writeErrorBytes(err error) []byte {
errorItem := a.NewObject()
errorItem.Set("message", a.NewString(err.Error()))
+ if st, ok := status.FromError(err); ok {
+ ext := a.NewObject()
+ ext.Set("code", a.NewString(st.Code().String()))
+ ext.Set("grpcStatusCode", a.NewNumberInt(int(st.Code())))
+ ext.Set("grpcStatusMessage", a.NewString(st.Message()))
+ errorItem.Set("extensions", ext)
+ }
errorArray.SetArrayItem(0, errorItem)
return errorRoot.MarshalTo(nil)
diff --git a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-03-05__17-48-26__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_test.go b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_test.go
index a14f5a1..f34c796 100644
--- a/tmp/agent-patch-flux-pr-1155.1-of-1.2026-03-05__17-48-26__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_test.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_test.go
@@ -15,6 +15,7 @@ import (
"github.com/wundergraph/graphql-go-tools/v2/pkg/grpctest"
"github.com/wundergraph/graphql-go-tools/v2/pkg/grpctest/productv1"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/encoding/protojson"
@@ -413,7 +414,8 @@ func Test_DataSource_Load_WithGrpcError(t *testing.T) {
// 8. Parse the response JSON for more detailed validation
var response struct {
Errors []struct {
- Message string `json:"message"`
+ Message string `json:"message"`
+ Extensions map[string]interface{} `json:"extensions"`
} `json:"errors"`
}
@@ -425,6 +427,11 @@ func Test_DataSource_Load_WithGrpcError(t *testing.T) {
// Verify the error message
require.Contains(t, response.Errors[0].Message, "user not found: error-user")
+
+ require.NotNil(t, response.Errors[0].Extensions, "Expected extensions to be present")
+ require.Equal(t, "NotFound", response.Errors[0].Extensions["code"])
+ require.Equal(t, float64(codes.NotFound), response.Errors[0].Extensions["grpcStatusCode"])
+ require.Equal(t, "user not found: error-user", response.Errors[0].Extensions["grpcStatusMessage"])
}
func TestMarshalResponseJSON(t *testing.T) {
@@ -493,6 +500,46 @@ func TestMarshalResponseJSON(t *testing.T) {
require.Equal(t, `{"_entities":[{"__typename":"Product","id":"123","name_different":"test","price_different":123.45}]}`, responseJSON.String())
}
+func TestMarshalResponseJSONRepeatedScalars(t *testing.T) {
+ protoSchema := `
+syntax = "proto3";
+package test;
+
+message RepeatedResponse {
+ repeated string tags = 1;
+ repeated int32 numbers = 2;
+}
+`
+
+ compiler, err := NewProtoCompiler(protoSchema, nil)
+ require.NoError(t, err)
+
+ messageDesc := compiler.doc.MessageByName("RepeatedResponse").Desc
+ responseMessage := dynamicpb.NewMessage(messageDesc)
+
+ tagsList := responseMessage.Mutable(messageDesc.Fields().ByName("tags")).List()
+ tagsList.Append(protoref.ValueOfString("a"))
+ tagsList.Append(protoref.ValueOfString("b"))
+
+ numsList := responseMessage.Mutable(messageDesc.Fields().ByName("numbers")).List()
+ numsList.Append(protoref.ValueOfInt32(1))
+ numsList.Append(protoref.ValueOfInt32(2))
+
+ arena := astjson.Arena{}
+ responseMetadata := RPCMessage{
+ Name: "RepeatedResponse",
+ Fields: []RPCField{
+ {Name: "tags", TypeName: string(DataTypeString), JSONPath: "tags", Repeated: true},
+ {Name: "numbers", TypeName: string(DataTypeInt32), JSONPath: "numbers", Repeated: true},
+ },
+ }
+
+ ds := &DataSource{}
+ responseJSON, err := ds.marshalResponseJSON(&arena, &responseMetadata, responseMessage)
+ require.NoError(t, err)
+ require.Equal(t, `{"tags":["a","b"],"numbers":[1,2]}`, responseJSON.String())
+}
+
// TODO test interface types
// Test_DataSource_Load_WithAnimalInterface tests the datasource with Animal interface types (Cat/Dog)
// using a bufconn connection to the mock service