agent_patch
runs/2026-02-28__13-38-10__gpt-5-1-codex-mini/flux-pr-817/flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/agent-logs/agent.patch
21179 bytes
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go b/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
index 1cb2914..94cb1ac 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
+++ b/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
@@ -3,7 +3,8 @@ package pubsub_datasource
import (
"encoding/json"
"fmt"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
+ "strings"
+
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"slices"
@@ -35,37 +36,40 @@ type NatsEventManager struct {
}
func (p *NatsEventManager) extractEventSubject(ref int, subject string) (string, error) {
- matches := eventSubjectRegex.FindAllStringSubmatch(subject, -1)
- if len(matches) != 1 || len(matches[0]) != 2 {
- return "", fmt.Errorf("expected subject to match regex")
- }
- argumentName := matches[0][1]
- // We need to find the argument in the operation
- argumentRef, ok := p.visitor.Operation.FieldArgument(ref, []byte(argumentName))
+ fieldDefRef, ok := p.visitor.Walker.FieldDefinition(ref)
if !ok {
- return "", fmt.Errorf("argument \"%s\" is not defined", argumentName)
+ return "", fmt.Errorf("field definition missing for field ref %d", ref)
}
- argumentValue := p.visitor.Operation.ArgumentValue(argumentRef)
- if argumentValue.Kind != ast.ValueKindVariable {
- return "", fmt.Errorf("expected argument \"%s\" kind to be \"ValueKindVariable\" but received \"%s\"", argumentName, argumentValue.Kind)
- }
- variableName := p.visitor.Operation.VariableValueNameBytes(argumentValue.Ref)
- variableDefinition, ok := p.visitor.Operation.VariableDefinitionByNameAndOperation(p.visitor.Walker.Ancestors[0].Ref, variableName)
+ operationDefRef, ok := plan.OperationDefinitionRef(p.visitor.Walker)
if !ok {
- return "", fmt.Errorf("expected definition to exist for variable \"%s\"", variableName)
+ return "", fmt.Errorf("operation definition missing while parsing subject")
}
- variableTypeRef := p.visitor.Operation.VariableDefinitions[variableDefinition].Type
- renderer, err := resolve.NewPlainVariableRendererWithValidationFromTypeRef(p.visitor.Operation, p.visitor.Definition, variableTypeRef, string(variableName))
+ template, err := plan.ParseArgumentTemplate(p.visitor.Operation, p.visitor.Definition, operationDefRef, ref, fieldDefRef, []byte(subject))
if err != nil {
return "", err
}
- contextVariable := &resolve.ContextVariable{
- Path: []string{string(variableName)},
- Renderer: renderer,
+ var builder strings.Builder
+ for _, segment := range template.Segments {
+ if segment.Variable == nil {
+ builder.Write(segment.Static)
+ continue
+ }
+ renderer, err := resolve.NewPlainVariableRendererWithValidationFromTypeRef(p.visitor.Operation, p.visitor.Definition, segment.Variable.VariableTypeRef, segment.Variable.VariablePath...)
+ if err != nil {
+ return "", err
+ }
+ contextVariable := &resolve.ContextVariable{
+ Path: append([]string(nil), segment.Variable.VariablePath...),
+ Renderer: renderer,
+ }
+ placeholder, _ := p.variables.AddVariable(contextVariable)
+ builder.WriteString(placeholder)
}
- // We need to replace the template literal with the variable placeholder (and reuse if it already exists)
- variablePlaceHolder, _ := p.variables.AddVariable(contextVariable) // $$0$$
- return eventSubjectRegex.ReplaceAllLiteralString(subject, variablePlaceHolder), nil
+ subjectWithPlaceholders := builder.String()
+ if err := validateNatsSubject(subjectWithPlaceholders); err != nil {
+ return "", err
+ }
+ return subjectWithPlaceholders, nil
}
func (p *NatsEventManager) eventDataBytes(ref int) ([]byte, error) {
@@ -120,3 +124,19 @@ func (p *NatsEventManager) handleSubscriptionEvent(ref int) {
StreamConfiguration: p.eventConfiguration.StreamConfiguration,
}
}
+
+func validateNatsSubject(subject string) error {
+ if len(subject) == 0 {
+ return fmt.Errorf("subject must not be empty")
+ }
+ if strings.ContainsAny(subject, " \t\r\n") {
+ return fmt.Errorf("subject must not contain whitespace")
+ }
+ if subject[0] == '.' || subject[len(subject)-1] == '.' {
+ return fmt.Errorf("subject must not start or end with '.'")
+ }
+ if strings.Contains(subject, "..") {
+ return fmt.Errorf("subject must not contain empty segments")
+ }
+ return nil
+}
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
index 2c817b1..fac32ce 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
+++ b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
@@ -44,6 +44,7 @@ func TestPubSub(t *testing.T) {
type Subscription {
helloSubscription(id: String!): String! @eventsSubscribe(subjects: ["helloSubscription.{{ args.id }}"])
subscriptionWithMultipleSubjects(firstId: String!, secondId: String!): String! @eventsSubscribe(subjects: ["firstSubscription.{{ args.firstId }}", "secondSubscription.{{ args.secondId }}"])
+ subscriptionWithMultiTemplates(firstId: String!, secondId: String!): String! @eventsSubscribe(subjects: ["multi.{{ args.firstId }}.{{ args.secondId }}"])
}`
dataSourceCustomConfig := Configuration{
@@ -92,6 +93,17 @@ func TestPubSub(t *testing.T) {
Subjects: []string{"firstSubscription.{{ args.firstId }}", "secondSubscription.{{ args.secondId }}"},
},
},
+ {
+ Metadata: &EventMetadata{
+ ProviderID: "default",
+ FieldName: "subscriptionWithMultiTemplates",
+ Type: EventTypeSubscribe,
+ TypeName: "Subscription",
+ },
+ Configuration: &NatsEventConfiguration{
+ Subjects: []string{"multi.{{ args.firstId }}.{{ args.secondId }}"},
+ },
+ },
},
}
@@ -116,6 +128,10 @@ func TestPubSub(t *testing.T) {
TypeName: "Subscription",
FieldNames: []string{"subscriptionWithMultipleSubjects"},
},
+ {
+ TypeName: "Subscription",
+ FieldNames: []string{"subscriptionWithMultiTemplates"},
+ },
},
},
dataSourceCustomConfig,
@@ -171,6 +187,20 @@ func TestPubSub(t *testing.T) {
},
},
},
+ {
+ TypeName: "Subscription",
+ FieldName: "subscriptionWithMultiTemplates",
+ Arguments: []plan.ArgumentConfiguration{
+ {
+ Name: "firstId",
+ SourceType: plan.FieldArgumentSource,
+ },
+ {
+ Name: "secondId",
+ SourceType: plan.FieldArgumentSource,
+ },
+ },
+ },
},
DisableResolveFieldPositions: true,
}
@@ -332,4 +362,45 @@ func TestPubSub(t *testing.T) {
}
datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
})
+
+ t.Run("subscription with multi templates in subject", func(t *testing.T) {
+ const operation = "subscription SubscriptionWithMultiTemplates { subscriptionWithMultiTemplates(firstId: 11, secondId: 23) }"
+ const operationName = `SubscriptionWithMultiTemplates`
+ expect := &plan.SubscriptionResponsePlan{
+ Response: &resolve.GraphQLSubscription{
+ Trigger: resolve.GraphQLSubscriptionTrigger{
+ Input: []byte(`{"providerId":"default","subjects":["multi.$$0$$.$$1$$"]}`),
+ Variables: resolve.Variables{
+ &resolve.ContextVariable{
+ Path: []string{"a"},
+ Renderer: resolve.NewPlainVariableRendererWithValidation(`{"type":["string"]}`),
+ },
+ &resolve.ContextVariable{
+ Path: []string{"b"},
+ Renderer: resolve.NewPlainVariableRendererWithValidation(`{"type":["string"]}`),
+ },
+ },
+ Source: &NatsSubscriptionSource{
+ pubSub: &testPubsub{},
+ },
+ PostProcessing: resolve.PostProcessingConfiguration{
+ MergePath: []string{"subscriptionWithMultiTemplates"},
+ },
+ },
+ Response: &resolve.GraphQLResponse{
+ Data: &resolve.Object{
+ Fields: []*resolve.Field{
+ {
+ Name: []byte("subscriptionWithMultiTemplates"),
+ Value: &resolve.String{
+ Path: []string{"subscriptionWithMultiTemplates"},
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+ datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
+ })
}
diff --git a/app/v2/pkg/engine/plan/argument_template.go b/app/v2/pkg/engine/plan/argument_template.go
new file mode 100644
index 0000000..2aa3003
--- /dev/null
+++ b/app/v2/pkg/engine/plan/argument_template.go
@@ -0,0 +1,194 @@
+package plan
+
+import (
+ "fmt"
+ "regexp"
+ "strings"
+
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/astvisitor"
+)
+
+var argumentTemplateRegex = regexp.MustCompile(`{{\s*args((?:\.[a-zA-Z0-9_]+)+)\s*}}`)
+
+// ArgumentTemplate describes the string broken down into static and argument-derived segments.
+type ArgumentTemplate struct {
+ Segments []ArgumentTemplateSegment
+}
+
+// ArgumentTemplateSegment is either a static chunk or an argument template.
+type ArgumentTemplateSegment struct {
+ Static []byte
+ Variable *ArgumentTemplateVariable
+}
+
+// ArgumentTemplateVariable describes a single {{ args.<...> }} reference.
+type ArgumentTemplateVariable struct {
+ ArgumentName string
+ VariableName string
+ VariablePath []string
+ VariableTypeRef int
+}
+
+// ParseArgumentTemplate parses the value into template segments and validates each argument path.
+// It returns the static/variable pieces and the variable metadata required to render the final value.
+func ParseArgumentTemplate(operation, definition *ast.Document, operationDefinitionRef, fieldRef, fieldDefinitionRef int, value []byte) (ArgumentTemplate, error) {
+ if len(value) == 0 {
+ return ArgumentTemplate{Segments: []ArgumentTemplateSegment{{Static: nil}}}, nil
+ }
+
+ segments := make([]ArgumentTemplateSegment, 0)
+ matches := argumentTemplateRegex.FindAllSubmatchIndex(value, -1)
+ if len(matches) == 0 {
+ segments = append(segments, ArgumentTemplateSegment{Static: append([]byte(nil), value...)})
+ return ArgumentTemplate{Segments: segments}, nil
+ }
+
+ if fieldDefinitionRef == ast.InvalidRef {
+ return ArgumentTemplate{}, fmt.Errorf("field definition is not available for parsing argument templates")
+ }
+ if operationDefinitionRef == ast.InvalidRef {
+ return ArgumentTemplate{}, fmt.Errorf("operation definition is not available for parsing argument templates")
+ }
+
+ prev := 0
+ for _, match := range matches {
+ if match[0] > prev {
+ segments = append(segments, ArgumentTemplateSegment{Static: append([]byte(nil), value[prev:match[0]]...)})
+ }
+
+ argumentPathBytes := value[match[2]:match[3]]
+ if len(argumentPathBytes) == 0 || argumentPathBytes[0] != '.' {
+ return ArgumentTemplate{}, fmt.Errorf("invalid argument path in template")
+ }
+ parts := strings.Split(string(argumentPathBytes[1:]), ".")
+ if len(parts) == 0 {
+ return ArgumentTemplate{}, fmt.Errorf("empty argument path in template")
+ }
+ variable, err := resolveArgumentTemplateVariable(operation, definition, operationDefinitionRef, fieldRef, fieldDefinitionRef, parts)
+ if err != nil {
+ return ArgumentTemplate{}, err
+ }
+ segments = append(segments, ArgumentTemplateSegment{Variable: variable})
+ prev = match[1]
+ }
+
+ if prev < len(value) {
+ segments = append(segments, ArgumentTemplateSegment{Static: append([]byte(nil), value[prev:]...)})
+ }
+
+ return ArgumentTemplate{Segments: segments}, nil
+}
+
+func resolveArgumentTemplateVariable(operation, definition *ast.Document, operationDefinitionRef, fieldRef, fieldDefinitionRef int, path []string) (*ArgumentTemplateVariable, error) {
+ argumentName := path[0]
+ argumentRef, ok := operation.FieldArgument(fieldRef, []byte(argumentName))
+ if !ok {
+ return nil, fmt.Errorf("field argument %q is not defined", argumentName)
+ }
+ argumentValue := operation.ArgumentValue(argumentRef)
+ if argumentValue.Kind != ast.ValueKindVariable {
+ return nil, fmt.Errorf("argument %q must be backed by a variable", argumentName)
+ }
+ variableName := operation.VariableValueNameString(argumentValue.Ref)
+ variableDefinitionRef, exists := operation.VariableDefinitionByNameAndOperation(operationDefinitionRef, operation.VariableValueNameBytes(argumentValue.Ref))
+ if !exists {
+ return nil, fmt.Errorf("variable definition %q is not available", variableName)
+ }
+ variableTypeRef := operation.VariableDefinitions[variableDefinitionRef].Type
+
+ argumentDefinitionRef := findFieldArgumentDefinition(definition, fieldDefinitionRef, argumentName)
+ if argumentDefinitionRef == ast.InvalidRef {
+ return nil, fmt.Errorf("argument definition %q could not be resolved", argumentName)
+ }
+ argumentTypeRef := definition.InputValueDefinitions[argumentDefinitionRef].Type
+
+ nested := path[1:]
+ if err := validateArgumentPath(definition, argumentTypeRef, nested); err != nil {
+ return nil, err
+ }
+
+ variablePath := append([]string{variableName}, nested...)
+ return &ArgumentTemplateVariable{
+ ArgumentName: argumentName,
+ VariableName: variableName,
+ VariablePath: variablePath,
+ VariableTypeRef: variableTypeRef,
+ }, nil
+}
+
+func findFieldArgumentDefinition(definition *ast.Document, fieldDefinitionRef int, argumentName string) int {
+ for _, argRef := range definition.FieldDefinitionArgumentsDefinitions(fieldDefinitionRef) {
+ if definition.InputValueDefinitionNameString(argRef) == argumentName {
+ return argRef
+ }
+ }
+ return ast.InvalidRef
+}
+
+func validateArgumentPath(definition *ast.Document, typeRef int, path []string) error {
+ current := typeRef
+ for i, segment := range path {
+ if !isInputObject(definition, current) {
+ return fmt.Errorf("cannot traverse field %q because %s is not an input object", segment, definition.TypeNameString(current))
+ }
+ typeName := definition.TypeNameString(current)
+ node, ok := definition.Index.FirstNodeByNameStr(typeName)
+ if !ok {
+ return fmt.Errorf("input type %s not found", typeName)
+ }
+ fieldRef := definition.InputObjectTypeDefinitionInputValueDefinitionByName(node.Ref, []byte(segment))
+ if fieldRef == ast.InvalidRef {
+ return fmt.Errorf("field %q does not exist on %s", segment, definition.TypeNameString(current))
+ }
+ current = definition.InputValueDefinitions[fieldRef].Type
+ if i < len(path)-1 {
+ if !isInputObject(definition, current) {
+ return fmt.Errorf("cannot traverse beyond %q because %s is not an input object", segment, definition.TypeNameString(current))
+ }
+ }
+ }
+ if !isLeaf(definition, current) {
+ return fmt.Errorf("argument path must end on a leaf type but %s is not a leaf", definition.TypeNameString(current))
+ }
+ return nil
+}
+
+func isInputObject(definition *ast.Document, typeRef int) bool {
+ base := unwrapNonNull(definition, typeRef)
+ if definition.TypeIsList(base) {
+ return false
+ }
+ node, exists := definition.Index.FirstNodeByNameStr(definition.TypeNameString(base))
+ return exists && node.Kind == ast.NodeKindInputObjectTypeDefinition
+}
+
+func isLeaf(definition *ast.Document, typeRef int) bool {
+ base := definition.ResolveUnderlyingType(typeRef)
+ name := definition.TypeNameString(base)
+ node, exists := definition.Index.FirstNodeByNameStr(name)
+ if exists {
+ return node.Kind == ast.NodeKindScalarTypeDefinition || node.Kind == ast.NodeKindEnumTypeDefinition
+ }
+ switch name {
+ case "Boolean", "String", "ID", "Int", "Float":
+ return true
+ }
+ return false
+}
+
+func unwrapNonNull(definition *ast.Document, typeRef int) int {
+ for definition.Types[typeRef].TypeKind == ast.TypeKindNonNull {
+ typeRef = definition.Types[typeRef].OfType
+ }
+ return typeRef
+}
+
+func OperationDefinitionRef(walker *astvisitor.Walker) (int, bool) {
+ for _, ancestor := range walker.Ancestors {
+ if ancestor.Kind == ast.NodeKindOperationDefinition {
+ return ancestor.Ref, true
+ }
+ }
+ return ast.InvalidRef, false
+}
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/plan/configuration_visitor.go b/app/v2/pkg/engine/plan/configuration_visitor.go
index b5e2ed1..682c76d 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/plan/configuration_visitor.go
+++ b/app/v2/pkg/engine/plan/configuration_visitor.go
@@ -1,9 +1,7 @@
package plan
import (
- "bytes"
"fmt"
- "regexp"
"slices"
"strings"
@@ -870,85 +868,42 @@ func (c *configurationVisitor) buildSubscriptionFilterCondition(condition Subscr
return filter
}
-var (
- // subscriptionFieldFilterRegex is used to extract the variable name from the subscription filter condition
- // e.g. {{ args.id }} -> id
- // e.g. {{ args.input.id }} -> input.id
- subscriptionFieldFilterRegex = regexp.MustCompile(`{{\s*args((?:\.[a-zA-Z0-9_]+)+)\s*}}`)
-)
-
-// ContainsTemplateString checks if the value contains a template string
-func ContainsTemplateString(value []byte) bool {
- return bytes.Contains(value, []byte("{{"))
-}
-
func (c *configurationVisitor) buildSubscriptionFieldFilter(condition *SubscriptionFieldCondition) *resolve.SubscriptionFieldFilter {
filter := &resolve.SubscriptionFieldFilter{}
filter.FieldPath = condition.FieldPath
filter.Values = make([]resolve.InputTemplate, len(condition.Values))
+ fieldDefinitionRef, ok := c.walker.FieldDefinition(c.fieldRef)
+ if !ok {
+ c.walker.StopWithInternalErr(fmt.Errorf("field definition is not available for subscription filter"))
+ return nil
+ }
+ operationDefinitionRef, ok := OperationDefinitionRef(c.walker)
+ if !ok {
+ c.walker.StopWithInternalErr(fmt.Errorf("operation definition is not available for subscription filter"))
+ return nil
+ }
for i, value := range condition.Values {
- matches := subscriptionFieldFilterRegex.FindAllStringSubmatchIndex(value, -1)
- if len(matches) == 0 {
- filter.Values[i].Segments = []resolve.TemplateSegment{
- {
- SegmentType: resolve.StaticSegmentType,
- Data: []byte(value),
- },
- }
- continue
+ template, err := ParseArgumentTemplate(c.operation, c.definition, operationDefinitionRef, c.fieldRef, fieldDefinitionRef, []byte(value))
+ if err != nil {
+ c.walker.StopWithInternalErr(fmt.Errorf("failed to parse subscription filter value %q: %w", value, err))
+ return nil
}
- if len(matches) == 1 && len(matches[0]) == 4 {
- prefix := value[:matches[0][0]]
- hasPrefix := len(prefix) > 0
- // the path begins with ".", so ignore the first empty string element with trailing [1:]
- argumentPath := strings.Split(value[matches[0][2]:matches[0][3]][1:], ".")
- argumentName := argumentPath[0]
- argumentRef, ok := c.operation.FieldArgument(c.fieldRef, []byte(argumentName))
- if !ok {
- c.walker.StopWithInternalErr(fmt.Errorf(`field argument "%s" is not defined`, argumentName))
- return nil
- }
- argumentValue := c.operation.ArgumentValue(argumentRef)
- if argumentValue.Kind != ast.ValueKindVariable {
- c.walker.StopWithInternalErr(fmt.Errorf(`expected argument "%s" kind to be "ValueKindVariable" but received "%s"`, argumentName, argumentValue.Kind))
- return nil
- }
- variableName := c.operation.VariableValueNameString(argumentValue.Ref)
- // the variable path should be the variable name, e.g., "a", and then the 2nd element from the path onwards
- variablePath := append([]string{variableName}, argumentPath[1:]...)
- suffix := value[matches[0][1]:]
- hasSuffix := len(suffix) > 0
- size := 1
- if hasPrefix {
- size++
- }
- if hasSuffix {
- size++
- }
- filter.Values[i].Segments = make([]resolve.TemplateSegment, size)
- idx := 0
- if hasPrefix {
- filter.Values[i].Segments[idx] = resolve.TemplateSegment{
+ filter.Values[i].Segments = make([]resolve.TemplateSegment, 0, len(template.Segments))
+ for _, segment := range template.Segments {
+ if segment.Variable == nil {
+ filter.Values[i].Segments = append(filter.Values[i].Segments, resolve.TemplateSegment{
SegmentType: resolve.StaticSegmentType,
- Data: []byte(prefix),
- }
- idx++
+ Data: append([]byte(nil), segment.Static...),
+ })
+ continue
}
- filter.Values[i].Segments[idx] = resolve.TemplateSegment{
+ filter.Values[i].Segments = append(filter.Values[i].Segments, resolve.TemplateSegment{
SegmentType: resolve.VariableSegmentType,
VariableKind: resolve.ContextVariableKind,
+ VariableSourcePath: append([]string(nil), segment.Variable.VariablePath...),
Renderer: resolve.NewPlainVariableRenderer(),
- VariableSourcePath: variablePath,
- }
- if hasSuffix {
- filter.Values[i].Segments[idx+1] = resolve.TemplateSegment{
- SegmentType: resolve.StaticSegmentType,
- Data: []byte(suffix),
- }
- }
- continue
+ })
}
- return nil
}
return filter
}