agent_patch
runs/2026-02-28__13-38-10__gpt-5-3-codex/flux-pr-1262/flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/agent-logs/agent.patch
47962 bytes
diff --git a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go b/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go
index ee49e81..020c40d 100644
--- a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/compiler.go
@@ -3,6 +3,7 @@ package grpcdatasource
import (
"context"
"fmt"
+ "strings"
"github.com/bufbuild/protocompile"
"github.com/tidwall/gjson"
@@ -330,12 +331,16 @@ type Invocation struct {
Input *dynamicpb.Message
Output *dynamicpb.Message
Call *RPCCall
+ // EntityResultIndexes stores the original representations indexes for entity calls.
+ // It is empty for non-entity invocations.
+ EntityResultIndexes []int
}
// Compile processes an RPCExecutionPlan and builds protobuf messages from JSON data
// based on the compiled schema.
func (p *RPCCompiler) Compile(executionPlan *RPCExecutionPlan, inputData gjson.Result) ([]Invocation, error) {
invocations := make([]Invocation, 0, len(executionPlan.Calls))
+ entityAssignments := p.assignEntityRepresentations(executionPlan, inputData.Get("representations").Array())
for _, call := range executionPlan.Calls {
inputMessage, ok := p.doc.MessageByName(call.Request.Name)
@@ -348,7 +353,18 @@ func (p *RPCCompiler) Compile(executionPlan *RPCExecutionPlan, inputData gjson.R
return nil, fmt.Errorf("output message %s not found in document", call.Response.Name)
}
- request := p.buildProtoMessage(inputMessage, &call.Request, inputData)
+ requestData := inputData
+ entityIndexes := []int(nil)
+ if call.Entity != nil {
+ assignment, ok := entityAssignments[call.CallID]
+ if !ok || len(assignment.rawRepresentations) == 0 {
+ continue
+ }
+ entityIndexes = assignment.indexes
+ requestData = buildEntityInputData(assignment.rawRepresentations)
+ }
+
+ request := p.buildProtoMessage(inputMessage, &call.Request, requestData)
response := p.newEmptyMessage(outputMessage)
if p.report.HasErrors() {
@@ -361,17 +377,87 @@ func (p *RPCCompiler) Compile(executionPlan *RPCExecutionPlan, inputData gjson.R
}
invocations = append(invocations, Invocation{
- ServiceName: serviceName,
- MethodName: call.MethodName,
- Input: request,
- Output: response,
- Call: &call,
+ ServiceName: serviceName,
+ MethodName: call.MethodName,
+ Input: request,
+ Output: response,
+ Call: &call,
+ EntityResultIndexes: entityIndexes,
})
}
return invocations, nil
}
+type entityAssignment struct {
+ rawRepresentations []string
+ indexes []int
+}
+
+func (p *RPCCompiler) assignEntityRepresentations(executionPlan *RPCExecutionPlan, representations []gjson.Result) map[int]entityAssignment {
+ assignments := make(map[int]entityAssignment)
+ if len(representations) == 0 {
+ return assignments
+ }
+
+ entityCallsByType := make(map[string][]RPCCall)
+ for _, call := range executionPlan.Calls {
+ if call.Entity == nil {
+ continue
+ }
+ entityCallsByType[call.Entity.TypeName] = append(entityCallsByType[call.Entity.TypeName], call)
+ }
+
+ for index, representation := range representations {
+ typeName := representation.Get("__typename").String()
+ calls := entityCallsByType[typeName]
+ if len(calls) == 0 {
+ continue
+ }
+
+ for _, call := range calls {
+ if !representationMatchesKeyFields(representation, call.Entity.KeyFields) {
+ continue
+ }
+
+ assignment := assignments[call.CallID]
+ assignment.rawRepresentations = append(assignment.rawRepresentations, representation.Raw)
+ assignment.indexes = append(assignment.indexes, index)
+ assignments[call.CallID] = assignment
+ break
+ }
+ }
+
+ return assignments
+}
+
+func representationMatchesKeyFields(representation gjson.Result, keyFields []string) bool {
+ if len(keyFields) == 0 {
+ return false
+ }
+
+ for _, keyField := range keyFields {
+ if !representation.Get(keyField).Exists() {
+ return false
+ }
+ }
+
+ return true
+}
+
+func buildEntityInputData(rawRepresentations []string) gjson.Result {
+ var builder strings.Builder
+ builder.WriteString(`{"representations":[`)
+ for i, raw := range rawRepresentations {
+ if i > 0 {
+ builder.WriteByte(',')
+ }
+ builder.WriteString(raw)
+ }
+ builder.WriteString(`]}`)
+ return gjson.Parse(builder.String())
+}
+
func (p *RPCCompiler) resolveServiceName(methodName string) (string, bool) {
for _, service := range p.doc.Services {
for _, methodRef := range service.MethodsRefs {
diff --git a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/configuration.go b/app/v2/pkg/engine/datasource/grpc_datasource/configuration.go
index 6ee1940..93445a2 100644
--- a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/configuration.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/configuration.go
@@ -17,7 +17,7 @@ type GRPCMapping struct {
// SubscriptionRPCs maps GraphQL subscription fields to the corresponding gRPC RPC configurations
SubscriptionRPCs RPCConfigMap
// EntityRPCs defines how GraphQL types are resolved as entities using specific RPCs
- EntityRPCs map[string]EntityRPCConfig
+ EntityRPCs map[string][]EntityRPCConfig
// Fields defines the field mappings between GraphQL types and gRPC messages
Fields map[string]FieldMap
// EnumValues defines the enum values for each enum type
diff --git a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan.go b/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan.go
index ad41e4b..6245e72 100644
--- a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan.go
@@ -68,6 +68,14 @@ type RPCCall struct {
Request RPCMessage
// Response contains the message structure for the gRPC response
Response RPCMessage
+ // Entity contains federation entity lookup metadata.
+ // It is set only for _entities calls.
+ Entity *FederationEntityCall
+}
+
+type FederationEntityCall struct {
+ TypeName string
+ KeyFields []string
}
// RPCMessage represents a gRPC message structure for requests and responses.
diff --git a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan_federation_test.go b/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan_federation_test.go
index 5756d9d..d5ed4b3 100644
--- a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan_federation_test.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan_federation_test.go
@@ -21,13 +21,15 @@ func TestEntityLookup(t *testing.T) {
query: `query EntityLookup($representations: [_Any!]!) { _entities(representations: $representations) { ... on Product { __typename id name price } } }`,
mapping: &GRPCMapping{
Service: "Products",
- EntityRPCs: map[string]EntityRPCConfig{
+ EntityRPCs: map[string][]EntityRPCConfig{
"Product": {
- Key: "id",
- RPCConfig: RPCConfig{
- RPC: "LookupProductById",
- Request: "LookupProductByIdRequest",
- Response: "LookupProductByIdResponse",
+ {
+ Key: "id",
+ RPCConfig: RPCConfig{
+ RPC: "LookupProductById",
+ Request: "LookupProductByIdRequest",
+ Response: "LookupProductByIdResponse",
+ },
},
},
},
@@ -35,9 +37,13 @@ func TestEntityLookup(t *testing.T) {
expectedPlan: &RPCExecutionPlan{
Calls: []RPCCall{
{
+ CallID: 0,
ServiceName: "Products",
MethodName: "LookupProductById",
- // Define the structure of the request message
+ Entity: &FederationEntityCall{
+ TypeName: "Product",
+ KeyFields: []string{"id"},
+ },
Request: RPCMessage{
Name: "LookupProductByIdRequest",
Fields: []RPCField{
@@ -59,7 +65,6 @@ func TestEntityLookup(t *testing.T) {
},
},
},
- // Define the structure of the response message
Response: RPCMessage{
Name: "LookupProductByIdResponse",
Fields: []RPCField{
@@ -77,21 +82,9 @@ func TestEntityLookup(t *testing.T) {
JSONPath: "__typename",
StaticValue: "Product",
},
- {
- Name: "id",
- TypeName: string(DataTypeString),
- JSONPath: "id",
- },
- {
- Name: "name",
- TypeName: string(DataTypeString),
- JSONPath: "name",
- },
- {
- Name: "price",
- TypeName: string(DataTypeDouble),
- JSONPath: "price",
- },
+ {Name: "id", TypeName: string(DataTypeString), JSONPath: "id"},
+ {Name: "name", TypeName: string(DataTypeString), JSONPath: "name"},
+ {Name: "price", TypeName: string(DataTypeDouble), JSONPath: "price"},
},
},
},
@@ -101,225 +94,72 @@ func TestEntityLookup(t *testing.T) {
},
},
},
-
- // TODO implement multiple entity lookup types
- // {
- // name: "Should create an execution plan for an entity lookup multiple types",
- // query: `
- // query EntityLookup($representations: [_Any!]!) {
- // _entities(representations: $representations) {
- // ... on Product {
- // id
- // name
- // price
- // }
- // ... on Storage {
- // id
- // name
- // location
- // }
- // }
- // }
- // `,
- // expectedPlan: &RPCExecutionPlan{
- // Groups: []RPCCallGroup{
- // {
- // Calls: []RPCCall{
- // {
- // ServiceName: "Products",
- // MethodName: "LookupProductById",
- // // Define the structure of the request message
- // Request: RPCMessage{
- // Name: "LookupProductByIdRequest",
- // Fields: []RPCField{
- // {
- // Name: "inputs",
- // TypeName: string(DataTypeMessage),
- // Repeated: true,
- // JSONPath: "representations", // Path to extract data from GraphQL variables
- //
-
- // Message: &RPCMessage{
- // Name: "LookupProductByIdInput",
- // Fields: []RPCField{
- // {
- // Name: "key",
- // TypeName: string(DataTypeMessage),
- //
-
- // Message: &RPCMessage{
- // Name: "ProductByIdKey",
- // Fields: []RPCField{
- // {
- // Name: "id",
- // TypeName: string(DataTypeString),
- // JSONPath: "id", // Extract 'id' from each representation
- //
-
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // // Define the structure of the response message
- // Response: RPCMessage{
- // Name: "LookupProductByIdResponse",
- // Fields: []RPCField{
- // {
- // Name: "results",
- // TypeName: string(DataTypeMessage),
- // Repeated: true,
- //
-
- // JSONPath: "results",
- // Message: &RPCMessage{
- // Name: "LookupProductByIdResult",
- // Fields: []RPCField{
- // {
- // Name: "product",
- // TypeName: string(DataTypeMessage),
- //
-
- // Message: &RPCMessage{
- // Name: "Product",
- // Fields: []RPCField{
- // {
- // Name: "id",
- // TypeName: string(DataTypeString),
- // JSONPath: "id",
- // },
- // {
- // Name: "name",
- // TypeName: string(DataTypeString),
- // JSONPath: "name",
- // },
- // {
- // Name: "price",
- // TypeName: string(DataTypeFloat),
- // JSONPath: "price",
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // {
- // ServiceName: "Products",
- // MethodName: "LookupStorageById",
- // Request: RPCMessage{
- // Name: "LookupStorageByIdRequest",
- // Fields: []RPCField{
- // {
- // Name: "inputs",
- // TypeName: string(DataTypeMessage),
- // Repeated: true,
- // JSONPath: "representations",
- // Message: &RPCMessage{
- // Name: "LookupStorageByIdInput",
- // Fields: []RPCField{
- // {
- // Name: "key",
- // TypeName: string(DataTypeMessage),
- // Message: &RPCMessage{
- // Name: "StorageByIdKey",
- // Fields: []RPCField{
- // {
- // Name: "id",
- // TypeName: string(DataTypeString),
- // JSONPath: "id",
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // Response: RPCMessage{
- // Name: "LookupStorageByIdResponse",
- // Fields: []RPCField{
- // {
- // Name: "results",
- // TypeName: string(DataTypeMessage),
- // Repeated: true,
- // JSONPath: "results",
- // Message: &RPCMessage{
- // Name: "LookupStorageByIdResult",
- // Fields: []RPCField{
- // {
- // Name: "storage",
- // TypeName: string(DataTypeMessage),
- // Message: &RPCMessage{
- // Name: "Storage",
- // Fields: []RPCField{
- // {
- // Name: "id",
- // TypeName: string(DataTypeString),
- // JSONPath: "id",
- // },
- // {
- // Name: "name",
- // TypeName: string(DataTypeString),
- // JSONPath: "name",
- // },
- // {
- // Name: "location",
- // TypeName: string(DataTypeString),
- // JSONPath: "location",
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
- // },
+ {
+ name: "Should create one call per entity type in a single entity lookup",
+ query: `query EntityLookup($representations: [_Any!]!) {
+ _entities(representations: $representations) {
+ ... on Product { id name price }
+ ... on Storage { id name location }
+ }
+ }`,
+ mapping: &GRPCMapping{
+ Service: "Products",
+ EntityRPCs: map[string][]EntityRPCConfig{
+ "Product": {
+ {Key: "id", RPCConfig: RPCConfig{RPC: "LookupProductById", Request: "LookupProductByIdRequest", Response: "LookupProductByIdResponse"}},
+ },
+ "Storage": {
+ {Key: "id", RPCConfig: RPCConfig{RPC: "LookupStorageById", Request: "LookupStorageByIdRequest", Response: "LookupStorageByIdResponse"}},
+ },
+ },
+ },
+ expectedPlan: &RPCExecutionPlan{
+ Calls: []RPCCall{
+ {
+ CallID: 0,
+ ServiceName: "Products",
+ MethodName: "LookupProductById",
+ Entity: &FederationEntityCall{TypeName: "Product", KeyFields: []string{"id"}},
+ Request: RPCMessage{Name: "LookupProductByIdRequest", Fields: []RPCField{{Name: "keys", TypeName: string(DataTypeMessage), Repeated: true, JSONPath: "representations", Message: &RPCMessage{Name: "LookupProductByIdKey", Fields: []RPCField{{Name: "id", TypeName: string(DataTypeString), JSONPath: "id"}}}}}},
+ Response: RPCMessage{Name: "LookupProductByIdResponse", Fields: []RPCField{{Name: "result", TypeName: string(DataTypeMessage), Repeated: true, JSONPath: "_entities", Message: &RPCMessage{Name: "Product", Fields: []RPCField{{Name: "id", TypeName: string(DataTypeString), JSONPath: "id"}, {Name: "name", TypeName: string(DataTypeString), JSONPath: "name"}, {Name: "price", TypeName: string(DataTypeDouble), JSONPath: "price"}}}}}},
+ },
+ {
+ CallID: 1,
+ ServiceName: "Products",
+ MethodName: "LookupStorageById",
+ Entity: &FederationEntityCall{TypeName: "Storage", KeyFields: []string{"id"}},
+ Request: RPCMessage{Name: "LookupStorageByIdRequest", Fields: []RPCField{{Name: "keys", TypeName: string(DataTypeMessage), Repeated: true, JSONPath: "representations", Message: &RPCMessage{Name: "LookupStorageByIdKey", Fields: []RPCField{{Name: "id", TypeName: string(DataTypeString), JSONPath: "id"}}}}}},
+ Response: RPCMessage{Name: "LookupStorageByIdResponse", Fields: []RPCField{{Name: "result", TypeName: string(DataTypeMessage), Repeated: true, JSONPath: "_entities", Message: &RPCMessage{Name: "Storage", Fields: []RPCField{{Name: "id", TypeName: string(DataTypeString), JSONPath: "id"}, {Name: "name", TypeName: string(DataTypeString), JSONPath: "name"}, {Name: "location", TypeName: string(DataTypeString), JSONPath: "location"}}}}}},
+ },
+ },
+ },
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
- // Parse the GraphQL schema
schemaDoc := grpctest.MustGraphQLSchema(t)
- // Parse the GraphQL query
queryDoc, report := astparser.ParseGraphqlDocumentString(tt.query)
if report.HasErrors() {
t.Fatalf("failed to parse query: %s", report.Error())
}
walker := astvisitor.NewWalker(48)
-
rpcPlanVisitor := newRPCPlanVisitor(&walker, rpcPlanVisitorConfig{
subgraphName: "Products",
mapping: tt.mapping,
})
walker.Walk(&queryDoc, &schemaDoc, &report)
-
if report.HasErrors() {
t.Fatalf("failed to walk AST: %s", report.Error())
}
diff := cmp.Diff(tt.expectedPlan, rpcPlanVisitor.plan)
if diff != "" {
- t.Fatalf("execution plan mismatch: %s", diff)
+ t.Fatalf("plan mismatch (-want +got):\n%s", diff)
}
})
}
diff --git a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan_visitor.go b/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan_visitor.go
index edf2dd8..9af005f 100644
--- a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan_visitor.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/execution_plan_visitor.go
@@ -2,6 +2,7 @@ package grpcdatasource
import (
"fmt"
+ "slices"
"strings"
"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
@@ -18,12 +19,20 @@ type keyField struct {
type entityInfo struct {
name string
- keyFields []keyField
- keyTypeName string
+ keyConfigs []entityKeyConfig
entityRootFieldRef int
entityInlineFragmentRef int
}
+type entityKeyConfig struct {
+ keyFields []keyField
+ keyFieldNames []string
+ keyTypeName string
+ methodName string
+ requestName string
+ responseName string
+}
+
type planningInfo struct {
entityInfo entityInfo
// resolvers []string
@@ -53,7 +62,6 @@ type rpcPlanVisitor struct {
plan *RPCExecutionPlan
operationDefinitionRef int
operationFieldRef int
- operationFieldRefs []int
currentCall *RPCCall
currentCallID int
}
@@ -100,15 +108,13 @@ func (r *rpcPlanVisitor) EnterDocument(operation *ast.Document, definition *ast.
// which requires special handling.
func (r *rpcPlanVisitor) EnterOperationDefinition(ref int) {
r.operationDefinitionRef = ref
-
- // Retrieves the fields from the root selection set.
- // These fields determine the names for the RPC functions to call.
- // TODO: handle fragments on root level `... on Query {}`
- selectionSetRef := r.operation.OperationDefinitions[ref].SelectionSet
- r.operationFieldRefs = r.operation.SelectionSetFieldSelections(selectionSetRef)
-
- r.plan.Calls = make([]RPCCall, len(r.operationFieldRefs))
+ r.plan.Calls = make([]RPCCall, 0)
r.planInfo.operationType = r.operation.OperationDefinitions[ref].OperationType
+ r.planInfo.entityInfo = entityInfo{
+ entityRootFieldRef: -1,
+ entityInlineFragmentRef: -1,
+ }
+ r.planInfo.isEntityLookup = false
}
// EnterArgument implements astvisitor.EnterArgumentVisitor.
@@ -122,7 +128,11 @@ func (r *rpcPlanVisitor) EnterArgument(ref int) {
}
a := r.walker.Ancestor()
- if a.Kind != ast.NodeKindField && a.Ref != r.operationFieldRef {
+ if a.Kind != ast.NodeKindField || a.Ref != r.operationFieldRef {
+ return
+ }
+
+ if r.planInfo.currentRequestMessage == nil {
return
}
argumentInputValueDefinitionRef, exists := r.walker.ArgumentInputValueDefinition(ref)
@@ -145,6 +155,10 @@ func (r *rpcPlanVisitor) EnterSelectionSet(ref int) {
return
}
+ if r.planInfo.currentResponseMessage == nil {
+ return
+ }
+
if len(r.planInfo.currentResponseMessage.Fields) == 0 || len(r.planInfo.currentResponseMessage.Fields) <= r.planInfo.currentResponseFieldIndex {
return
}
@@ -235,8 +249,23 @@ func (r *rpcPlanVisitor) EnterInlineFragment(ref int) {
entityInfo := &r.planInfo.entityInfo
if entityInfo.entityRootFieldRef != -1 && entityInfo.entityInlineFragmentRef == -1 {
entityInfo.entityInlineFragmentRef = ref
- r.resolveEntityInformation(ref)
- r.scaffoldEntityLookup()
+ if err := r.resolveEntityInformation(ref); err != nil {
+ r.walker.StopWithInternalErr(err)
+ return
+ }
+ if len(entityInfo.keyConfigs) == 0 {
+ return
+ }
+
+ r.currentCall = &RPCCall{
+ CallID: r.currentCallID,
+ ServiceName: r.resolveServiceName(),
+ }
+ r.planInfo.currentRequestMessage = &r.currentCall.Request
+ r.planInfo.currentResponseMessage = &r.currentCall.Response
+ r.planInfo.currentResponseFieldIndex = 0
+
+ r.scaffoldEntityLookup(entityInfo.keyConfigs[0])
return
}
@@ -245,7 +274,12 @@ func (r *rpcPlanVisitor) EnterInlineFragment(ref int) {
// LeaveInlineFragment implements astvisitor.InlineFragmentVisitor.
func (r *rpcPlanVisitor) LeaveInlineFragment(ref int) {
if ref == r.planInfo.entityInfo.entityInlineFragmentRef {
+ if r.currentCall != nil && len(r.planInfo.entityInfo.keyConfigs) > 0 {
+ r.appendEntityCallsForCurrentInlineFragment()
+ }
+
r.planInfo.entityInfo.entityInlineFragmentRef = -1
+ r.planInfo.entityInfo.keyConfigs = nil
}
}
@@ -253,6 +287,20 @@ func (r *rpcPlanVisitor) IsRootField() bool {
return len(r.walker.Ancestors) == 2 && r.walker.Ancestors[0].Kind == ast.NodeKindOperationDefinition
}
+func (r *rpcPlanVisitor) isTopLevelField() bool {
+ hasOperationAncestor := false
+ for _, ancestor := range r.walker.Ancestors {
+ switch ancestor.Kind {
+ case ast.NodeKindField:
+ return false
+ case ast.NodeKindOperationDefinition:
+ hasOperationAncestor = true
+ }
+ }
+
+ return hasOperationAncestor
+}
+
func (r *rpcPlanVisitor) IsInlineFragmentField() (int, bool) {
if len(r.walker.Ancestors) < 2 {
return -1, false
@@ -269,6 +317,7 @@ func (r *rpcPlanVisitor) IsInlineFragmentField() (int, bool) {
func (r *rpcPlanVisitor) handleRootField(ref int) error {
r.operationFieldRef = ref
r.planInfo.operationFieldName = r.operation.FieldNameString(ref)
+ r.planInfo.methodName = ""
r.currentCall = &RPCCall{
CallID: r.currentCallID,
@@ -289,28 +338,28 @@ func (r *rpcPlanVisitor) handleRootField(ref int) error {
// EnterField implements astvisitor.EnterFieldVisitor.
func (r *rpcPlanVisitor) EnterField(ref int) {
fieldName := r.operation.FieldNameString(ref)
- if r.IsRootField() {
- if err := r.handleRootField(ref); err != nil {
- r.walker.StopWithInternalErr(err)
- return
- }
- }
-
- if fieldName == "_entities" {
- // _entities is a special field that is used to look up entities
- // Entity lookups are handled differently as we use special types for
- // Providing variables (_Any) and the response type is a Union that needs to be
- // determined from the first inline fragment.
+ isRootField := r.isTopLevelField()
+ if isRootField && fieldName == "_entities" {
r.planInfo.entityInfo = entityInfo{
entityRootFieldRef: ref,
entityInlineFragmentRef: -1,
}
r.planInfo.isEntityLookup = true
- r.planInfo.entityInfo.entityRootFieldRef = ref
return
}
+ if isRootField {
+ if err := r.handleRootField(ref); err != nil {
+ r.walker.StopWithInternalErr(err)
+ return
+ }
+ }
+
// prevent duplicate fields
+ if r.planInfo.currentResponseMessage == nil {
+ return
+ }
+
fieldAlias := r.operation.FieldAliasString(ref)
if r.planInfo.currentResponseMessage.Fields.Exists(fieldName, fieldAlias) {
return
@@ -382,15 +431,25 @@ func (r *rpcPlanVisitor) buildField(fd int, fieldName, fieldAlias string) RPCFie
func (r *rpcPlanVisitor) LeaveField(ref int) {
if ref == r.planInfo.entityInfo.entityRootFieldRef {
r.planInfo.entityInfo.entityRootFieldRef = -1
+ r.planInfo.isEntityLookup = false
+ r.planInfo.currentResponseFieldIndex = 0
+ r.planInfo.currentRequestMessage = nil
+ r.planInfo.currentResponseMessage = nil
+ r.currentCall = nil
+ return
}
- // If we are not in the operation field, we can increment the response field index.
- if !r.IsRootField() {
+ // If we are not leaving the current operation root field, increment field index only.
+ if !r.isTopLevelField() {
r.planInfo.currentResponseFieldIndex++
return
}
// If we left the operation field, we need to finalize the current call and prepare the next one.
+ if r.currentCall == nil {
+ return
+ }
+
if r.currentCall.MethodName == "" {
methodName := r.rpcMethodName()
r.currentCall.MethodName = methodName
@@ -398,14 +457,11 @@ func (r *rpcPlanVisitor) LeaveField(ref int) {
r.currentCall.Response.Name = methodName + "Response"
}
- r.plan.Calls[r.currentCallID] = *r.currentCall
- r.currentCall = &RPCCall{}
+ r.plan.Calls = append(r.plan.Calls, *r.currentCall)
+ r.currentCall = nil
r.currentCallID++
- if r.currentCallID < len(r.operationFieldRefs) {
- r.operationFieldRef = r.operationFieldRefs[r.currentCallID]
- }
-
+ r.operationFieldRef = -1
r.planInfo.currentResponseFieldIndex = 0
}
@@ -584,95 +640,145 @@ func (r *rpcPlanVisitor) createListMetadata(typeRef int) *ListMetadata {
return md
}
-func (r *rpcPlanVisitor) resolveEntityInformation(inlineFragmentRef int) {
- // TODO support multiple entities in a single query
- if !r.planInfo.isEntityLookup || r.planInfo.entityInfo.name != "" {
- return
+func (r *rpcPlanVisitor) resolveEntityInformation(inlineFragmentRef int) error {
+ if !r.planInfo.isEntityLookup {
+ return nil
}
fragmentName := r.operation.InlineFragmentTypeConditionNameString(inlineFragmentRef)
node, found := r.definition.NodeByNameStr(fragmentName)
if !found {
- return
+ return nil
}
// Only process object type definitions
// TODO: handle interfaces
if node.Kind != ast.NodeKindObjectTypeDefinition {
- return
+ return nil
}
// An entity must at least have a key directive
def := r.definition.ObjectTypeDefinitions[node.Ref]
if !def.HasDirectives {
- return
+ return nil
}
- // TODO: We currently only support one key directive per entity
- // We need to get the used key from the graphql datasource.
+ entityInfo := &r.planInfo.entityInfo
+ entityInfo.name = fragmentName
+
+ schemaKeyConfigs := make([]entityKeyConfig, 0)
for _, directiveRef := range def.Directives.Refs {
if r.definition.DirectiveNameString(directiveRef) != federationKeyDirectiveName {
continue
}
- r.planInfo.entityInfo.name = fragmentName
-
directive := r.definition.Directives[directiveRef]
+ var keyFields []keyField
for _, argRef := range directive.Arguments.Refs {
if r.definition.ArgumentNameString(argRef) != "fields" {
continue
}
argument := r.definition.Arguments[argRef]
- keyFieldName := r.definition.ValueContentString(argument.Value)
+ keyFieldNames := parseKeyDirectiveFields(r.definition.ValueContentString(argument.Value))
+ for _, keyFieldName := range keyFieldNames {
+ fieldDef, ok := r.definition.NodeFieldDefinitionByName(node, ast.ByteSlice(keyFieldName))
+ if !ok {
+ r.walker.Report.AddExternalError(operationreport.ExternalError{
+ Message: fmt.Sprintf("Field %s not found in definition", keyFieldName),
+ })
+ return nil
+ }
+
+ fdt := r.definition.FieldDefinitionType(fieldDef)
+ ft := r.definition.Types[fdt]
+
+ keyFields = append(keyFields, keyField{
+ fieldName: keyFieldName,
+ fieldType: r.toDataType(&ft).String(),
+ })
+ }
+ }
+
+ if len(keyFields) == 0 {
+ continue
+ }
+ keyFieldNames := make([]string, 0, len(keyFields))
+ for _, k := range keyFields {
+ keyFieldNames = append(keyFieldNames, k.fieldName)
+ }
+
+ schemaKeyConfigs = append(schemaKeyConfigs, entityKeyConfig{
+ keyFields: keyFields,
+ keyFieldNames: keyFieldNames,
+ keyTypeName: fragmentName + "By" + strings.Join(titleSlice(slices.Clone(keyFieldNames)), "And"),
+ })
+ }
+
+ entityInfo.keyConfigs = r.resolveEntityKeyConfigsFromMapping(fragmentName, node, schemaKeyConfigs)
+ return nil
+}
+
+func (r *rpcPlanVisitor) resolveEntityKeyConfigsFromMapping(typeName string, node ast.Node, fallback []entityKeyConfig) []entityKeyConfig {
+ if r.mapping == nil || len(r.mapping.EntityRPCs[typeName]) == 0 {
+ return fallback
+ }
+
+ configs := make([]entityKeyConfig, 0, len(r.mapping.EntityRPCs[typeName]))
+ for _, cfg := range r.mapping.EntityRPCs[typeName] {
+ if cfg.Key == "" {
+ continue
+ }
+
+ keyFieldNames := parseKeyDirectiveFields(cfg.Key)
+ keyFields := make([]keyField, 0, len(keyFieldNames))
+ for _, keyFieldName := range keyFieldNames {
fieldDef, ok := r.definition.NodeFieldDefinitionByName(node, ast.ByteSlice(keyFieldName))
if !ok {
- r.walker.Report.AddExternalError(operationreport.ExternalError{
- Message: fmt.Sprintf("Field %s not found in definition", keyFieldName),
- })
- return
+ continue
}
-
fdt := r.definition.FieldDefinitionType(fieldDef)
ft := r.definition.Types[fdt]
- r.planInfo.entityInfo.keyFields =
- append(r.planInfo.entityInfo.keyFields, keyField{
- fieldName: keyFieldName,
- fieldType: r.toDataType(&ft).String(),
- })
+ keyFields = append(keyFields, keyField{
+ fieldName: keyFieldName,
+ fieldType: r.toDataType(&ft).String(),
+ })
}
- break
- }
+ if len(keyFields) == 0 {
+ continue
+ }
- keyFields := make([]string, 0, len(r.planInfo.entityInfo.keyFields))
- for _, key := range r.planInfo.entityInfo.keyFields {
- keyFields = append(keyFields, key.fieldName)
+ configs = append(configs, entityKeyConfig{
+ keyFields: keyFields,
+ keyFieldNames: keyFieldNames,
+ keyTypeName: typeName + "By" + strings.Join(titleSlice(slices.Clone(keyFieldNames)), "And"),
+ methodName: cfg.RPC,
+ requestName: cfg.Request,
+ responseName: cfg.Response,
+ })
}
- if ei, exists := r.mapping.EntityRPCs[r.planInfo.entityInfo.name]; exists {
- r.currentCall.Request.Name = ei.RPCConfig.Request
- r.currentCall.Response.Name = ei.RPCConfig.Response
- r.planInfo.methodName = ei.RPCConfig.RPC
+ if len(configs) == 0 {
+ return fallback
}
- r.planInfo.entityInfo.keyTypeName = r.planInfo.entityInfo.name + "By" + strings.Join(titleSlice(keyFields), "And")
+ return uniqueEntityKeyConfigs(configs)
}
// scaffoldEntityLookup creates the entity lookup call structure
// by creating the key field message and adding it to the current request message.
// It also adds the results message to the current response message.
-func (r *rpcPlanVisitor) scaffoldEntityLookup() {
+func (r *rpcPlanVisitor) scaffoldEntityLookup(config entityKeyConfig) {
if !r.planInfo.isEntityLookup {
return
}
- entityInfo := &r.planInfo.entityInfo
keyFieldMessage := &RPCMessage{
- Name: r.rpcMethodName() + "Key",
+ Name: r.entityMethodName(config) + "Key",
}
- for _, key := range entityInfo.keyFields {
+ for _, key := range config.keyFields {
keyFieldMessage.Fields = append(keyFieldMessage.Fields, RPCField{
Name: key.fieldName,
TypeName: key.fieldType,
@@ -690,6 +796,14 @@ func (r *rpcPlanVisitor) scaffoldEntityLookup() {
},
}
+ r.currentCall.MethodName = r.entityMethodName(config)
+ r.currentCall.Request.Name = r.entityRequestName(config)
+ r.currentCall.Response.Name = r.entityResponseName(config)
+ r.currentCall.Entity = &FederationEntityCall{
+ TypeName: r.planInfo.entityInfo.name,
+ KeyFields: slices.Clone(config.keyFieldNames),
+ }
+
// The proto response message has a field `result` which is a list of entities.
// As this is a special case we directly map it to _entities.
r.planInfo.currentResponseMessage.Fields = []RPCField{
@@ -702,6 +816,76 @@ func (r *rpcPlanVisitor) scaffoldEntityLookup() {
}
}
+func (r *rpcPlanVisitor) appendEntityCallsForCurrentInlineFragment() {
+ // Append the active call with the first key configuration and clone the remaining key configurations.
+ base := *r.currentCall
+ r.plan.Calls = append(r.plan.Calls, base)
+ r.currentCallID++
+
+ for i := 1; i < len(r.planInfo.entityInfo.keyConfigs); i++ {
+ cfg := r.planInfo.entityInfo.keyConfigs[i]
+ call := base
+ call.CallID = r.currentCallID
+ call.MethodName = r.entityMethodName(cfg)
+ call.Request.Name = r.entityRequestName(cfg)
+ call.Response.Name = r.entityResponseName(cfg)
+ call.Entity = &FederationEntityCall{
+ TypeName: r.planInfo.entityInfo.name,
+ KeyFields: slices.Clone(cfg.keyFieldNames),
+ }
+ call.Request.Fields = make([]RPCField, 0, 1)
+ keyMessage := &RPCMessage{
+ Name: call.MethodName + "Key",
+ }
+ for _, key := range cfg.keyFields {
+ keyMessage.Fields = append(keyMessage.Fields, RPCField{
+ Name: key.fieldName,
+ TypeName: key.fieldType,
+ JSONPath: key.fieldName,
+ })
+ }
+ call.Request.Fields = append(call.Request.Fields, RPCField{
+ Name: "keys",
+ TypeName: DataTypeMessage.String(),
+ Repeated: true,
+ JSONPath: "representations",
+ Message: keyMessage,
+ })
+
+ r.plan.Calls = append(r.plan.Calls, call)
+ r.currentCallID++
+ }
+
+ r.currentCall = nil
+ r.planInfo.currentRequestMessage = nil
+ r.planInfo.currentResponseMessage = nil
+ r.planInfo.currentResponseFieldIndex = 0
+}
+
+func (r *rpcPlanVisitor) entityMethodName(config entityKeyConfig) string {
+ if config.methodName != "" {
+ return config.methodName
+ }
+
+ return "Lookup" + config.keyTypeName
+}
+
+func (r *rpcPlanVisitor) entityRequestName(config entityKeyConfig) string {
+ if config.requestName != "" {
+ return config.requestName
+ }
+
+ return r.entityMethodName(config) + "Request"
+}
+
+func (r *rpcPlanVisitor) entityResponseName(config entityKeyConfig) string {
+ if config.responseName != "" {
+ return config.responseName
+ }
+
+ return r.entityMethodName(config) + "Response"
+}
+
func (r *rpcPlanVisitor) resolveServiceName() string {
if r.mapping == nil || r.mapping.Service == "" {
return r.subgraphName
@@ -801,10 +985,6 @@ func (r *rpcPlanVisitor) rpcMethodName() string {
// buildQueryMethodName constructs a method name for query operations.
func (r *rpcPlanVisitor) buildQueryMethodName() string {
- if r.planInfo.isEntityLookup && r.planInfo.entityInfo.name != "" {
- return "Lookup" + r.planInfo.entityInfo.keyTypeName
- }
-
return "Query" + cases.Title(language.Und, cases.NoLower).String(r.planInfo.operationFieldName)
}
@@ -861,6 +1041,43 @@ func (r *rpcPlanVisitor) parseGraphQLType(t *ast.Type) DataType {
}
}
+func parseKeyDirectiveFields(key string) []string {
+ if key == "" {
+ return nil
+ }
+
+ fields := strings.Fields(key)
+ filtered := make([]string, 0, len(fields))
+ for _, field := range fields {
+ // Nested keys are not supported by the gRPC entity planner yet.
+ if field == "{" || field == "}" {
+ continue
+ }
+ filtered = append(filtered, field)
+ }
+
+ return filtered
+}
+
+func uniqueEntityKeyConfigs(configs []entityKeyConfig) []entityKeyConfig {
+ if len(configs) <= 1 {
+ return configs
+ }
+
+ seen := make(map[string]struct{}, len(configs))
+ out := make([]entityKeyConfig, 0, len(configs))
+ for _, cfg := range configs {
+ key := strings.Join(cfg.keyFieldNames, " ")
+ if _, ok := seen[key]; ok {
+ continue
+ }
+ seen[key] = struct{}{}
+ out = append(out, cfg)
+ }
+
+ return out
+}
+
func (r *rpcPlanVisitor) typeIsNullableOrNestedList(typeRef int) bool {
if !r.definition.TypeIsNonNull(typeRef) && r.definition.TypeIsList(typeRef) {
return true
diff --git a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go
index a718b71..6ed43a2 100644
--- a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource.go
@@ -12,6 +12,8 @@ import (
"errors"
"fmt"
"strconv"
+ "strings"
+ "sync"
"github.com/tidwall/gjson"
"github.com/wundergraph/astjson"
@@ -93,8 +95,18 @@ func (d *DataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer)
a := astjson.Arena{}
root := a.NewObject()
- // make gRPC calls
+ regularInvocations := make([]Invocation, 0, len(invocations))
+ entityInvocations := make([]Invocation, 0, len(invocations))
for _, invocation := range invocations {
+ if invocation.Call != nil && invocation.Call.Entity != nil {
+ entityInvocations = append(entityInvocations, invocation)
+ continue
+ }
+ regularInvocations = append(regularInvocations, invocation)
+ }
+
+ // make regular gRPC calls
+ for _, invocation := range regularInvocations {
// Invoke the gRPC method - this will populate invocation.Output
methodName := fmt.Sprintf("/%s/%s", invocation.ServiceName, invocation.MethodName)
err := d.cc.Invoke(ctx, methodName, invocation.Input, invocation.Output)
@@ -115,6 +127,13 @@ func (d *DataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer)
}
}
+ if len(entityInvocations) > 0 {
+ err = d.executeEntityInvocations(ctx, &a, root, variables, entityInvocations, out)
+ if err != nil {
+ return err
+ }
+ }
+
data := a.NewObject()
data.Set("data", root)
out.Write(data.MarshalTo(nil))
@@ -122,6 +141,87 @@ func (d *DataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer)
return nil
}
+func (d *DataSource) executeEntityInvocations(ctx context.Context, arena *astjson.Arena, root *astjson.Value, variables gjson.Result, entityInvocations []Invocation, out *bytes.Buffer) error {
+ var (
+ wg sync.WaitGroup
+ firstErr error
+ errMu sync.Mutex
+ )
+
+ for i := range entityInvocations {
+ wg.Add(1)
+ go func(i int) {
+ defer wg.Done()
+ invocation := &entityInvocations[i]
+ methodName := fmt.Sprintf("/%s/%s", invocation.ServiceName, invocation.MethodName)
+ if err := d.cc.Invoke(ctx, methodName, invocation.Input, invocation.Output); err != nil {
+ errMu.Lock()
+ if firstErr == nil {
+ firstErr = err
+ }
+ errMu.Unlock()
+ }
+ }(i)
+ }
+ wg.Wait()
+
+ if firstErr != nil {
+ out.Write(writeErrorBytes(firstErr))
+ return nil
+ }
+
+ representationCount := len(variables.Get("representations").Array())
+ entityResults := make([]string, representationCount)
+ for i := range entityResults {
+ entityResults[i] = "null"
+ }
+
+ for _, invocation := range entityInvocations {
+ responseJSON, err := d.marshalResponseJSON(arena, &invocation.Call.Response, invocation.Output)
+ if err != nil {
+ return err
+ }
+
+ rawEntities := gjson.ParseBytes(responseJSON.MarshalTo(nil)).Get("_entities").Array()
+ for i, resultIndex := range invocation.EntityResultIndexes {
+ if i >= len(rawEntities) || resultIndex < 0 || resultIndex >= len(entityResults) {
+ continue
+ }
+ entityResults[resultIndex] = rawEntities[i].Raw
+ }
+ }
+
+ if representationCount > 0 {
+ entityResponse := buildEntitiesResponse(entityResults)
+ parsedEntityResponse, err := astjson.ParseBytesWithoutCache([]byte(entityResponse))
+ if err != nil {
+ return err
+ }
+
+ merged, _, err := astjson.MergeValues(root, parsedEntityResponse)
+ if err != nil {
+ out.Write(writeErrorBytes(err))
+ return nil
+ }
+ *root = *merged
+ }
+
+ return nil
+}
+
+func buildEntitiesResponse(entities []string) string {
+ var builder strings.Builder
+ builder.WriteString(`{"_entities":[`)
+ for i, entity := range entities {
+ if i > 0 {
+ builder.WriteByte(',')
+ }
+ builder.WriteString(entity)
+ }
+ builder.WriteString(`]}`)
+ return builder.String()
+}
+
// LoadWithFiles implements resolve.DataSource interface.
// Similar to Load, but handles file uploads if needed.
//
diff --git a/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_federation_test.go b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_federation_test.go
new file mode 100644
index 0000000..2db0ef1
--- /dev/null
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/grpc_datasource_federation_test.go
@@ -0,0 +1,87 @@
+package grpcdatasource
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/astparser"
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/grpctest"
+)
+
+func Test_DataSource_Load_FederationEntityLookup_MultipleTypes_PreservesOrder(t *testing.T) {
+ conn, cleanup := setupTestGRPCServer(t)
+ t.Cleanup(cleanup)
+
+ query := `query EntityLookup($representations: [_Any!]!) {
+ _entities(representations: $representations) {
+ ... on Product {
+ __typename
+ id
+ name
+ }
+ ... on Storage {
+ __typename
+ id
+ name
+ location
+ }
+ }
+ }`
+
+ schemaDoc := grpctest.MustGraphQLSchema(t)
+ queryDoc, report := astparser.ParseGraphqlDocumentString(query)
+ require.False(t, report.HasErrors(), report.Error())
+
+ compiler, err := NewProtoCompiler(grpctest.MustProtoSchema(t), nil)
+ require.NoError(t, err)
+
+ ds, err := NewDataSource(conn, DataSourceConfig{
+ Operation: &queryDoc,
+ Definition: &schemaDoc,
+ SubgraphName: "Products",
+ Compiler: compiler,
+ Mapping: &GRPCMapping{
+ EntityRPCs: map[string][]EntityRPCConfig{
+ "Product": {
+ {Key: "id", RPCConfig: RPCConfig{RPC: "LookupProductById", Request: "LookupProductByIdRequest", Response: "LookupProductByIdResponse"}},
+ },
+ "Storage": {
+ {Key: "id", RPCConfig: RPCConfig{RPC: "LookupStorageById", Request: "LookupStorageByIdRequest", Response: "LookupStorageByIdResponse"}},
+ },
+ },
+ },
+ })
+ require.NoError(t, err)
+
+ input := fmt.Sprintf(`{"query":%q,"body":{"variables":{"representations":[{"__typename":"Product","id":"p-1"},{"__typename":"Storage","id":"s-1"},{"__typename":"Product","id":"p-2"}]}}}`, query)
+
+ out := new(bytes.Buffer)
+ err = ds.Load(context.Background(), []byte(input), out)
+ require.NoError(t, err)
+
+ var response struct {
+ Data struct {
+ Entities []map[string]any `json:"_entities"`
+ } `json:"data"`
+ }
+
+ err = json.Unmarshal(out.Bytes(), &response)
+ require.NoError(t, err)
+ require.Len(t, response.Data.Entities, 3)
+
+ require.Equal(t, "Product", response.Data.Entities[0]["__typename"])
+ require.Equal(t, "p-1", response.Data.Entities[0]["id"])
+ require.Equal(t, "Product p-1", response.Data.Entities[0]["name"])
+
+ require.Equal(t, "Storage", response.Data.Entities[1]["__typename"])
+ require.Equal(t, "s-1", response.Data.Entities[1]["id"])
+ require.Equal(t, "Storage s-1", response.Data.Entities[1]["name"])
+
+ require.Equal(t, "Product", response.Data.Entities[2]["__typename"])
+ require.Equal(t, "p-2", response.Data.Entities[2]["id"])
+ require.Equal(t, "Product p-2", response.Data.Entities[2]["name"])
+}
diff --git a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/mapping_test_helper.go b/app/v2/pkg/engine/datasource/grpc_datasource/mapping_test_helper.go
index 146f371..1ace3cd 100644
--- a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/grpc_datasource/mapping_test_helper.go
+++ b/app/v2/pkg/engine/datasource/grpc_datasource/mapping_test_helper.go
@@ -218,21 +218,25 @@ func testMapping() *GRPCMapping {
},
},
SubscriptionRPCs: RPCConfigMap{},
- EntityRPCs: map[string]EntityRPCConfig{
+ EntityRPCs: map[string][]EntityRPCConfig{
"Product": {
- Key: "id",
- RPCConfig: RPCConfig{
- RPC: "LookupProductById",
- Request: "LookupProductByIdRequest",
- Response: "LookupProductByIdResponse",
+ {
+ Key: "id",
+ RPCConfig: RPCConfig{
+ RPC: "LookupProductById",
+ Request: "LookupProductByIdRequest",
+ Response: "LookupProductByIdResponse",
+ },
},
},
"Storage": {
- Key: "id",
- RPCConfig: RPCConfig{
- RPC: "LookupStorageById",
- Request: "LookupStorageByIdRequest",
- Response: "LookupStorageByIdResponse",
+ {
+ Key: "id",
+ RPCConfig: RPCConfig{
+ RPC: "LookupStorageById",
+ Request: "LookupStorageByIdRequest",
+ Response: "LookupStorageByIdResponse",
+ },
},
},
},
diff --git a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/grpctest/mapping/mapping.go b/app/v2/pkg/grpctest/mapping/mapping.go
index 842a029..1476d08 100644
--- a/tmp/agent-patch-flux-pr-1262.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/grpctest/mapping/mapping.go
+++ b/app/v2/pkg/grpctest/mapping/mapping.go
@@ -225,21 +225,25 @@ func DefaultGRPCMapping() *grpcdatasource.GRPCMapping {
},
},
SubscriptionRPCs: grpcdatasource.RPCConfigMap{},
- EntityRPCs: map[string]grpcdatasource.EntityRPCConfig{
+ EntityRPCs: map[string][]grpcdatasource.EntityRPCConfig{
"Product": {
- Key: "id",
- RPCConfig: grpcdatasource.RPCConfig{
- RPC: "LookupProductById",
- Request: "LookupProductByIdRequest",
- Response: "LookupProductByIdResponse",
+ {
+ Key: "id",
+ RPCConfig: grpcdatasource.RPCConfig{
+ RPC: "LookupProductById",
+ Request: "LookupProductByIdRequest",
+ Response: "LookupProductByIdResponse",
+ },
},
},
"Storage": {
- Key: "id",
- RPCConfig: grpcdatasource.RPCConfig{
- RPC: "LookupStorageById",
- Request: "LookupStorageByIdRequest",
- Response: "LookupStorageByIdResponse",
+ {
+ Key: "id",
+ RPCConfig: grpcdatasource.RPCConfig{
+ RPC: "LookupStorageById",
+ Request: "LookupStorageByIdRequest",
+ Response: "LookupStorageByIdResponse",
+ },
},
},
},