STET

agent_patch

runs/2026-02-28__13-38-10__gpt-5-1-codex-mini/flux-pr-817/flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/agent-logs/agent.patch

21179 bytes

Back to adjudication
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go b/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
index 1cb2914..94cb1ac 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
+++ b/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
@@ -3,7 +3,8 @@ package pubsub_datasource
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
+	"strings"
+
 	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
 	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
 	"slices"
@@ -35,37 +36,40 @@ type NatsEventManager struct {
 }
 
 func (p *NatsEventManager) extractEventSubject(ref int, subject string) (string, error) {
-	matches := eventSubjectRegex.FindAllStringSubmatch(subject, -1)
-	if len(matches) != 1 || len(matches[0]) != 2 {
-		return "", fmt.Errorf("expected subject to match regex")
-	}
-	argumentName := matches[0][1]
-	// We need to find the argument in the operation
-	argumentRef, ok := p.visitor.Operation.FieldArgument(ref, []byte(argumentName))
+	fieldDefRef, ok := p.visitor.Walker.FieldDefinition(ref)
 	if !ok {
-		return "", fmt.Errorf("argument \"%s\" is not defined", argumentName)
+		return "", fmt.Errorf("field definition missing for field ref %d", ref)
 	}
-	argumentValue := p.visitor.Operation.ArgumentValue(argumentRef)
-	if argumentValue.Kind != ast.ValueKindVariable {
-		return "", fmt.Errorf("expected argument \"%s\" kind to be \"ValueKindVariable\" but received \"%s\"", argumentName, argumentValue.Kind)
-	}
-	variableName := p.visitor.Operation.VariableValueNameBytes(argumentValue.Ref)
-	variableDefinition, ok := p.visitor.Operation.VariableDefinitionByNameAndOperation(p.visitor.Walker.Ancestors[0].Ref, variableName)
+	operationDefRef, ok := plan.OperationDefinitionRef(p.visitor.Walker)
 	if !ok {
-		return "", fmt.Errorf("expected definition to exist for variable \"%s\"", variableName)
+		return "", fmt.Errorf("operation definition missing while parsing subject")
 	}
-	variableTypeRef := p.visitor.Operation.VariableDefinitions[variableDefinition].Type
-	renderer, err := resolve.NewPlainVariableRendererWithValidationFromTypeRef(p.visitor.Operation, p.visitor.Definition, variableTypeRef, string(variableName))
+	template, err := plan.ParseArgumentTemplate(p.visitor.Operation, p.visitor.Definition, operationDefRef, ref, fieldDefRef, []byte(subject))
 	if err != nil {
 		return "", err
 	}
-	contextVariable := &resolve.ContextVariable{
-		Path:     []string{string(variableName)},
-		Renderer: renderer,
+	var builder strings.Builder
+	for _, segment := range template.Segments {
+		if segment.Variable == nil {
+			builder.Write(segment.Static)
+			continue
+		}
+		renderer, err := resolve.NewPlainVariableRendererWithValidationFromTypeRef(p.visitor.Operation, p.visitor.Definition, segment.Variable.VariableTypeRef, segment.Variable.VariablePath...)
+		if err != nil {
+			return "", err
+		}
+		contextVariable := &resolve.ContextVariable{
+			Path:     append([]string(nil), segment.Variable.VariablePath...),
+			Renderer: renderer,
+		}
+		placeholder, _ := p.variables.AddVariable(contextVariable)
+		builder.WriteString(placeholder)
 	}
-	// We need to replace the template literal with the variable placeholder (and reuse if it already exists)
-	variablePlaceHolder, _ := p.variables.AddVariable(contextVariable) // $$0$$
-	return eventSubjectRegex.ReplaceAllLiteralString(subject, variablePlaceHolder), nil
+	subjectWithPlaceholders := builder.String()
+	if err := validateNatsSubject(subjectWithPlaceholders); err != nil {
+		return "", err
+	}
+	return subjectWithPlaceholders, nil
 }
 
 func (p *NatsEventManager) eventDataBytes(ref int) ([]byte, error) {
@@ -120,3 +124,19 @@ func (p *NatsEventManager) handleSubscriptionEvent(ref int) {
 		StreamConfiguration: p.eventConfiguration.StreamConfiguration,
 	}
 }
+
+func validateNatsSubject(subject string) error {
+	if len(subject) == 0 {
+		return fmt.Errorf("subject must not be empty")
+	}
+	if strings.ContainsAny(subject, " \t\r\n") {
+		return fmt.Errorf("subject must not contain whitespace")
+	}
+	if subject[0] == '.' || subject[len(subject)-1] == '.' {
+		return fmt.Errorf("subject must not start or end with '.'")
+	}
+	if strings.Contains(subject, "..") {
+		return fmt.Errorf("subject must not contain empty segments")
+	}
+	return nil
+}
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
index 2c817b1..fac32ce 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
+++ b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
@@ -44,6 +44,7 @@ func TestPubSub(t *testing.T) {
 	type Subscription {
 		helloSubscription(id: String!): String! @eventsSubscribe(subjects: ["helloSubscription.{{ args.id }}"])
 		subscriptionWithMultipleSubjects(firstId: String!, secondId: String!): String! @eventsSubscribe(subjects: ["firstSubscription.{{ args.firstId }}", "secondSubscription.{{ args.secondId }}"])
+		subscriptionWithMultiTemplates(firstId: String!, secondId: String!): String! @eventsSubscribe(subjects: ["multi.{{ args.firstId }}.{{ args.secondId }}"])
 	}`
 
 	dataSourceCustomConfig := Configuration{
@@ -92,6 +93,17 @@ func TestPubSub(t *testing.T) {
 					Subjects: []string{"firstSubscription.{{ args.firstId }}", "secondSubscription.{{ args.secondId }}"},
 				},
 			},
+			{
+				Metadata: &EventMetadata{
+					ProviderID: "default",
+					FieldName:  "subscriptionWithMultiTemplates",
+					Type:       EventTypeSubscribe,
+					TypeName:   "Subscription",
+				},
+				Configuration: &NatsEventConfiguration{
+					Subjects: []string{"multi.{{ args.firstId }}.{{ args.secondId }}"},
+				},
+			},
 		},
 	}
 
@@ -116,6 +128,10 @@ func TestPubSub(t *testing.T) {
 					TypeName:   "Subscription",
 					FieldNames: []string{"subscriptionWithMultipleSubjects"},
 				},
+				{
+					TypeName:   "Subscription",
+					FieldNames: []string{"subscriptionWithMultiTemplates"},
+				},
 			},
 		},
 		dataSourceCustomConfig,
@@ -171,6 +187,20 @@ func TestPubSub(t *testing.T) {
 					},
 				},
 			},
+			{
+				TypeName:  "Subscription",
+				FieldName: "subscriptionWithMultiTemplates",
+				Arguments: []plan.ArgumentConfiguration{
+					{
+						Name:       "firstId",
+						SourceType: plan.FieldArgumentSource,
+					},
+					{
+						Name:       "secondId",
+						SourceType: plan.FieldArgumentSource,
+					},
+				},
+			},
 		},
 		DisableResolveFieldPositions: true,
 	}
@@ -332,4 +362,45 @@ func TestPubSub(t *testing.T) {
 		}
 		datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
 	})
+
+	t.Run("subscription with multi templates in subject", func(t *testing.T) {
+		const operation = "subscription SubscriptionWithMultiTemplates { subscriptionWithMultiTemplates(firstId: 11, secondId: 23) }"
+		const operationName = `SubscriptionWithMultiTemplates`
+		expect := &plan.SubscriptionResponsePlan{
+			Response: &resolve.GraphQLSubscription{
+				Trigger: resolve.GraphQLSubscriptionTrigger{
+					Input: []byte(`{"providerId":"default","subjects":["multi.$$0$$.$$1$$"]}`),
+					Variables: resolve.Variables{
+						&resolve.ContextVariable{
+							Path:     []string{"a"},
+							Renderer: resolve.NewPlainVariableRendererWithValidation(`{"type":["string"]}`),
+						},
+						&resolve.ContextVariable{
+							Path:     []string{"b"},
+							Renderer: resolve.NewPlainVariableRendererWithValidation(`{"type":["string"]}`),
+						},
+					},
+					Source: &NatsSubscriptionSource{
+						pubSub: &testPubsub{},
+					},
+					PostProcessing: resolve.PostProcessingConfiguration{
+						MergePath: []string{"subscriptionWithMultiTemplates"},
+					},
+				},
+				Response: &resolve.GraphQLResponse{
+					Data: &resolve.Object{
+						Fields: []*resolve.Field{
+							{
+								Name: []byte("subscriptionWithMultiTemplates"),
+								Value: &resolve.String{
+									Path: []string{"subscriptionWithMultiTemplates"},
+								},
+							},
+						},
+					},
+				},
+			},
+		}
+		datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
+	})
 }
diff --git a/app/v2/pkg/engine/plan/argument_template.go b/app/v2/pkg/engine/plan/argument_template.go
new file mode 100644
index 0000000..2aa3003
--- /dev/null
+++ b/app/v2/pkg/engine/plan/argument_template.go
@@ -0,0 +1,194 @@
+package plan
+
+import (
+	"fmt"
+	"regexp"
+	"strings"
+
+	"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
+	"github.com/wundergraph/graphql-go-tools/v2/pkg/astvisitor"
+)
+
+var argumentTemplateRegex = regexp.MustCompile(`{{\s*args((?:\.[a-zA-Z0-9_]+)+)\s*}}`)
+
+// ArgumentTemplate describes the string broken down into static and argument-derived segments.
+type ArgumentTemplate struct {
+	Segments []ArgumentTemplateSegment
+}
+
+// ArgumentTemplateSegment is either a static chunk or an argument template.
+type ArgumentTemplateSegment struct {
+	Static   []byte
+	Variable *ArgumentTemplateVariable
+}
+
+// ArgumentTemplateVariable describes a single {{ args.<...> }} reference.
+type ArgumentTemplateVariable struct {
+	ArgumentName    string
+	VariableName    string
+	VariablePath    []string
+	VariableTypeRef int
+}
+
+// ParseArgumentTemplate parses the value into template segments and validates each argument path.
+// It returns the static/variable pieces and the variable metadata required to render the final value.
+func ParseArgumentTemplate(operation, definition *ast.Document, operationDefinitionRef, fieldRef, fieldDefinitionRef int, value []byte) (ArgumentTemplate, error) {
+	if len(value) == 0 {
+		return ArgumentTemplate{Segments: []ArgumentTemplateSegment{{Static: nil}}}, nil
+	}
+
+	segments := make([]ArgumentTemplateSegment, 0)
+	matches := argumentTemplateRegex.FindAllSubmatchIndex(value, -1)
+	if len(matches) == 0 {
+		segments = append(segments, ArgumentTemplateSegment{Static: append([]byte(nil), value...)})
+		return ArgumentTemplate{Segments: segments}, nil
+	}
+
+	if fieldDefinitionRef == ast.InvalidRef {
+		return ArgumentTemplate{}, fmt.Errorf("field definition is not available for parsing argument templates")
+	}
+	if operationDefinitionRef == ast.InvalidRef {
+		return ArgumentTemplate{}, fmt.Errorf("operation definition is not available for parsing argument templates")
+	}
+
+	prev := 0
+	for _, match := range matches {
+		if match[0] > prev {
+			segments = append(segments, ArgumentTemplateSegment{Static: append([]byte(nil), value[prev:match[0]]...)})
+		}
+
+		argumentPathBytes := value[match[2]:match[3]]
+		if len(argumentPathBytes) == 0 || argumentPathBytes[0] != '.' {
+			return ArgumentTemplate{}, fmt.Errorf("invalid argument path in template")
+		}
+		parts := strings.Split(string(argumentPathBytes[1:]), ".")
+		if len(parts) == 0 {
+			return ArgumentTemplate{}, fmt.Errorf("empty argument path in template")
+		}
+		variable, err := resolveArgumentTemplateVariable(operation, definition, operationDefinitionRef, fieldRef, fieldDefinitionRef, parts)
+		if err != nil {
+			return ArgumentTemplate{}, err
+		}
+		segments = append(segments, ArgumentTemplateSegment{Variable: variable})
+		prev = match[1]
+	}
+
+	if prev < len(value) {
+		segments = append(segments, ArgumentTemplateSegment{Static: append([]byte(nil), value[prev:]...)})
+	}
+
+	return ArgumentTemplate{Segments: segments}, nil
+}
+
+func resolveArgumentTemplateVariable(operation, definition *ast.Document, operationDefinitionRef, fieldRef, fieldDefinitionRef int, path []string) (*ArgumentTemplateVariable, error) {
+	argumentName := path[0]
+	argumentRef, ok := operation.FieldArgument(fieldRef, []byte(argumentName))
+	if !ok {
+		return nil, fmt.Errorf("field argument %q is not defined", argumentName)
+	}
+	argumentValue := operation.ArgumentValue(argumentRef)
+	if argumentValue.Kind != ast.ValueKindVariable {
+		return nil, fmt.Errorf("argument %q must be backed by a variable", argumentName)
+	}
+	variableName := operation.VariableValueNameString(argumentValue.Ref)
+	variableDefinitionRef, exists := operation.VariableDefinitionByNameAndOperation(operationDefinitionRef, operation.VariableValueNameBytes(argumentValue.Ref))
+	if !exists {
+		return nil, fmt.Errorf("variable definition %q is not available", variableName)
+	}
+	variableTypeRef := operation.VariableDefinitions[variableDefinitionRef].Type
+
+	argumentDefinitionRef := findFieldArgumentDefinition(definition, fieldDefinitionRef, argumentName)
+	if argumentDefinitionRef == ast.InvalidRef {
+		return nil, fmt.Errorf("argument definition %q could not be resolved", argumentName)
+	}
+	argumentTypeRef := definition.InputValueDefinitions[argumentDefinitionRef].Type
+
+	nested := path[1:]
+	if err := validateArgumentPath(definition, argumentTypeRef, nested); err != nil {
+		return nil, err
+	}
+
+	variablePath := append([]string{variableName}, nested...)
+	return &ArgumentTemplateVariable{
+		ArgumentName:    argumentName,
+		VariableName:    variableName,
+		VariablePath:    variablePath,
+		VariableTypeRef: variableTypeRef,
+	}, nil
+}
+
+func findFieldArgumentDefinition(definition *ast.Document, fieldDefinitionRef int, argumentName string) int {
+	for _, argRef := range definition.FieldDefinitionArgumentsDefinitions(fieldDefinitionRef) {
+		if definition.InputValueDefinitionNameString(argRef) == argumentName {
+			return argRef
+		}
+	}
+	return ast.InvalidRef
+}
+
+func validateArgumentPath(definition *ast.Document, typeRef int, path []string) error {
+	current := typeRef
+	for i, segment := range path {
+		if !isInputObject(definition, current) {
+			return fmt.Errorf("cannot traverse field %q because %s is not an input object", segment, definition.TypeNameString(current))
+		}
+		typeName := definition.TypeNameString(current)
+		node, ok := definition.Index.FirstNodeByNameStr(typeName)
+		if !ok {
+			return fmt.Errorf("input type %s not found", typeName)
+		}
+		fieldRef := definition.InputObjectTypeDefinitionInputValueDefinitionByName(node.Ref, []byte(segment))
+		if fieldRef == ast.InvalidRef {
+			return fmt.Errorf("field %q does not exist on %s", segment, definition.TypeNameString(current))
+		}
+		current = definition.InputValueDefinitions[fieldRef].Type
+		if i < len(path)-1 {
+			if !isInputObject(definition, current) {
+				return fmt.Errorf("cannot traverse beyond %q because %s is not an input object", segment, definition.TypeNameString(current))
+			}
+		}
+	}
+	if !isLeaf(definition, current) {
+		return fmt.Errorf("argument path must end on a leaf type but %s is not a leaf", definition.TypeNameString(current))
+	}
+	return nil
+}
+
+func isInputObject(definition *ast.Document, typeRef int) bool {
+	base := unwrapNonNull(definition, typeRef)
+	if definition.TypeIsList(base) {
+		return false
+	}
+	node, exists := definition.Index.FirstNodeByNameStr(definition.TypeNameString(base))
+	return exists && node.Kind == ast.NodeKindInputObjectTypeDefinition
+}
+
+func isLeaf(definition *ast.Document, typeRef int) bool {
+	base := definition.ResolveUnderlyingType(typeRef)
+	name := definition.TypeNameString(base)
+	node, exists := definition.Index.FirstNodeByNameStr(name)
+	if exists {
+		return node.Kind == ast.NodeKindScalarTypeDefinition || node.Kind == ast.NodeKindEnumTypeDefinition
+	}
+	switch name {
+	case "Boolean", "String", "ID", "Int", "Float":
+		return true
+	}
+	return false
+}
+
+func unwrapNonNull(definition *ast.Document, typeRef int) int {
+	for definition.Types[typeRef].TypeKind == ast.TypeKindNonNull {
+		typeRef = definition.Types[typeRef].OfType
+	}
+	return typeRef
+}
+
+func OperationDefinitionRef(walker *astvisitor.Walker) (int, bool) {
+	for _, ancestor := range walker.Ancestors {
+		if ancestor.Kind == ast.NodeKindOperationDefinition {
+			return ancestor.Ref, true
+		}
+	}
+	return ast.InvalidRef, false
+}
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/plan/configuration_visitor.go b/app/v2/pkg/engine/plan/configuration_visitor.go
index b5e2ed1..682c76d 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-1-codex-mini/app/v2/pkg/engine/plan/configuration_visitor.go
+++ b/app/v2/pkg/engine/plan/configuration_visitor.go
@@ -1,9 +1,7 @@
 package plan
 
 import (
-	"bytes"
 	"fmt"
-	"regexp"
 	"slices"
 	"strings"
 
@@ -870,85 +868,42 @@ func (c *configurationVisitor) buildSubscriptionFilterCondition(condition Subscr
 	return filter
 }
 
-var (
-	// subscriptionFieldFilterRegex is used to extract the variable name from the subscription filter condition
-	// e.g. {{ args.id }} -> id
-	// e.g. {{ args.input.id }} -> input.id
-	subscriptionFieldFilterRegex = regexp.MustCompile(`{{\s*args((?:\.[a-zA-Z0-9_]+)+)\s*}}`)
-)
-
-// ContainsTemplateString checks if the value contains a template string
-func ContainsTemplateString(value []byte) bool {
-	return bytes.Contains(value, []byte("{{"))
-}
-
 func (c *configurationVisitor) buildSubscriptionFieldFilter(condition *SubscriptionFieldCondition) *resolve.SubscriptionFieldFilter {
 	filter := &resolve.SubscriptionFieldFilter{}
 	filter.FieldPath = condition.FieldPath
 	filter.Values = make([]resolve.InputTemplate, len(condition.Values))
+	fieldDefinitionRef, ok := c.walker.FieldDefinition(c.fieldRef)
+	if !ok {
+		c.walker.StopWithInternalErr(fmt.Errorf("field definition is not available for subscription filter"))
+		return nil
+	}
+	operationDefinitionRef, ok := OperationDefinitionRef(c.walker)
+	if !ok {
+		c.walker.StopWithInternalErr(fmt.Errorf("operation definition is not available for subscription filter"))
+		return nil
+	}
 	for i, value := range condition.Values {
-		matches := subscriptionFieldFilterRegex.FindAllStringSubmatchIndex(value, -1)
-		if len(matches) == 0 {
-			filter.Values[i].Segments = []resolve.TemplateSegment{
-				{
-					SegmentType: resolve.StaticSegmentType,
-					Data:        []byte(value),
-				},
-			}
-			continue
+		template, err := ParseArgumentTemplate(c.operation, c.definition, operationDefinitionRef, c.fieldRef, fieldDefinitionRef, []byte(value))
+		if err != nil {
+			c.walker.StopWithInternalErr(fmt.Errorf("failed to parse subscription filter value %q: %w", value, err))
+			return nil
 		}
-		if len(matches) == 1 && len(matches[0]) == 4 {
-			prefix := value[:matches[0][0]]
-			hasPrefix := len(prefix) > 0
-			// the path begins with ".", so ignore the first empty string element with trailing [1:]
-			argumentPath := strings.Split(value[matches[0][2]:matches[0][3]][1:], ".")
-			argumentName := argumentPath[0]
-			argumentRef, ok := c.operation.FieldArgument(c.fieldRef, []byte(argumentName))
-			if !ok {
-				c.walker.StopWithInternalErr(fmt.Errorf(`field argument "%s" is not defined`, argumentName))
-				return nil
-			}
-			argumentValue := c.operation.ArgumentValue(argumentRef)
-			if argumentValue.Kind != ast.ValueKindVariable {
-				c.walker.StopWithInternalErr(fmt.Errorf(`expected argument "%s" kind to be "ValueKindVariable" but received "%s"`, argumentName, argumentValue.Kind))
-				return nil
-			}
-			variableName := c.operation.VariableValueNameString(argumentValue.Ref)
-			// the variable path should be the variable name, e.g., "a", and then the 2nd element from the path onwards
-			variablePath := append([]string{variableName}, argumentPath[1:]...)
-			suffix := value[matches[0][1]:]
-			hasSuffix := len(suffix) > 0
-			size := 1
-			if hasPrefix {
-				size++
-			}
-			if hasSuffix {
-				size++
-			}
-			filter.Values[i].Segments = make([]resolve.TemplateSegment, size)
-			idx := 0
-			if hasPrefix {
-				filter.Values[i].Segments[idx] = resolve.TemplateSegment{
+		filter.Values[i].Segments = make([]resolve.TemplateSegment, 0, len(template.Segments))
+		for _, segment := range template.Segments {
+			if segment.Variable == nil {
+				filter.Values[i].Segments = append(filter.Values[i].Segments, resolve.TemplateSegment{
 					SegmentType: resolve.StaticSegmentType,
-					Data:        []byte(prefix),
-				}
-				idx++
+					Data:        append([]byte(nil), segment.Static...),
+				})
+				continue
 			}
-			filter.Values[i].Segments[idx] = resolve.TemplateSegment{
+			filter.Values[i].Segments = append(filter.Values[i].Segments, resolve.TemplateSegment{
 				SegmentType:        resolve.VariableSegmentType,
 				VariableKind:       resolve.ContextVariableKind,
+				VariableSourcePath: append([]string(nil), segment.Variable.VariablePath...),
 				Renderer:           resolve.NewPlainVariableRenderer(),
-				VariableSourcePath: variablePath,
-			}
-			if hasSuffix {
-				filter.Values[i].Segments[idx+1] = resolve.TemplateSegment{
-					SegmentType: resolve.StaticSegmentType,
-					Data:        []byte(suffix),
-				}
-			}
-			continue
+			})
 		}
-		return nil
 	}
 	return filter
 }