STET

agent_patch

runs/2026-02-28__13-38-10__gpt-5-4/flux-pr-817/flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/agent-logs/agent.patch

29938 bytes

Back to adjudication
diff --git a/app/v2/pkg/engine/argumenttemplate/argument_template.go b/app/v2/pkg/engine/argumenttemplate/argument_template.go
new file mode 100644
index 0000000..8c9de81
--- /dev/null
+++ b/app/v2/pkg/engine/argumenttemplate/argument_template.go
@@ -0,0 +1,172 @@
+package argumenttemplate
+
+import (
+	"fmt"
+	"regexp"
+	"strings"
+	"unicode"
+
+	"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
+	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
+)
+
+var templateRegex = regexp.MustCompile(`{{\s*args((?:\.[a-zA-Z0-9_]+)+)\s*}}`)
+
+func Parse(template string, fieldRef, operationDefinitionRef int, operation, definition *ast.Document) (resolve.InputTemplate, error) {
+	inputTemplate := resolve.InputTemplate{}
+	matches := templateRegex.FindAllStringSubmatchIndex(template, -1)
+	if len(matches) == 0 {
+		inputTemplate.Segments = []resolve.TemplateSegment{{
+			SegmentType: resolve.StaticSegmentType,
+			Data:        []byte(template),
+		}}
+		return inputTemplate, nil
+	}
+
+	segments := make([]resolve.TemplateSegment, 0, len(matches)*2+1)
+	lastEnd := 0
+
+	for _, match := range matches {
+		if len(match) != 4 {
+			return resolve.InputTemplate{}, fmt.Errorf("invalid argument template")
+		}
+		if match[0] > lastEnd {
+			segments = append(segments, resolve.TemplateSegment{
+				SegmentType: resolve.StaticSegmentType,
+				Data:        []byte(template[lastEnd:match[0]]),
+			})
+		}
+
+		argumentPath := strings.Split(template[match[2]:match[3]][1:], ".")
+		segment, err := resolveArgumentTemplateSegment(argumentPath, fieldRef, operationDefinitionRef, operation, definition)
+		if err != nil {
+			return resolve.InputTemplate{}, err
+		}
+		segments = append(segments, segment)
+		lastEnd = match[1]
+	}
+
+	if lastEnd < len(template) {
+		segments = append(segments, resolve.TemplateSegment{
+			SegmentType: resolve.StaticSegmentType,
+			Data:        []byte(template[lastEnd:]),
+		})
+	}
+
+	inputTemplate.Segments = segments
+	return inputTemplate, nil
+}
+
+func ValidateNATSSubject(subject string) error {
+	if subject == "" {
+		return fmt.Errorf("subject must not be empty")
+	}
+	if strings.IndexFunc(subject, unicode.IsSpace) != -1 {
+		return fmt.Errorf("subject must not contain whitespace")
+	}
+
+	tokens := strings.Split(subject, ".")
+	for i, token := range tokens {
+		if token == "" {
+			return fmt.Errorf("subject must not contain empty tokens")
+		}
+		if strings.Contains(token, ">") && token != ">" {
+			return fmt.Errorf("token %q must not contain partial wildcard '>'", token)
+		}
+		if strings.Contains(token, "*") && token != "*" {
+			return fmt.Errorf("token %q must not contain partial wildcard '*'", token)
+		}
+		if token == ">" && i != len(tokens)-1 {
+			return fmt.Errorf("token \">\" must be the last token")
+		}
+	}
+
+	return nil
+}
+
+func resolveArgumentTemplateSegment(argumentPath []string, fieldRef, operationDefinitionRef int, operation, definition *ast.Document) (resolve.TemplateSegment, error) {
+	argumentName := argumentPath[0]
+	argumentRef, ok := operation.FieldArgument(fieldRef, []byte(argumentName))
+	if !ok {
+		return resolve.TemplateSegment{}, fmt.Errorf("field argument %q is not defined", argumentName)
+	}
+
+	argumentValue := operation.ArgumentValue(argumentRef)
+	if argumentValue.Kind != ast.ValueKindVariable {
+		return resolve.TemplateSegment{}, fmt.Errorf("expected argument %q kind to be %q but received %q", argumentName, ast.ValueKindVariable, argumentValue.Kind)
+	}
+
+	variableName := operation.VariableValueNameString(argumentValue.Ref)
+	variableDefinition, ok := operation.VariableDefinitionByNameAndOperation(operationDefinitionRef, operation.VariableValueNameBytes(argumentValue.Ref))
+	if !ok {
+		return resolve.TemplateSegment{}, fmt.Errorf("expected definition to exist for variable %q", variableName)
+	}
+
+	variableTypeRef := operation.VariableDefinitions[variableDefinition].Type
+	if err := validateArgumentPath(argumentPath, variableTypeRef, operation, definition); err != nil {
+		return resolve.TemplateSegment{}, err
+	}
+
+	variablePath := append([]string{variableName}, argumentPath[1:]...)
+
+	return resolve.TemplateSegment{
+		SegmentType:        resolve.VariableSegmentType,
+		VariableKind:       resolve.ContextVariableKind,
+		VariableSourcePath: variablePath,
+		Renderer:           resolve.NewPlainVariableRenderer(),
+	}, nil
+}
+
+func validateArgumentPath(argumentPath []string, variableTypeRef int, operation, definition *ast.Document) error {
+	currentTypeRef := variableTypeRef
+	currentTypeDoc := operation
+
+	for i := 1; i < len(argumentPath); i++ {
+		inputObjectNode, err := inputObjectNodeForTypeRef(currentTypeRef, currentTypeDoc, definition)
+		if err != nil {
+			return fmt.Errorf("argument template %q is invalid: %w", "args."+strings.Join(argumentPath, "."), err)
+		}
+
+		fieldName := argumentPath[i]
+		inputFieldRef := definition.InputObjectTypeDefinitionInputValueDefinitionByName(inputObjectNode.Ref, []byte(fieldName))
+		if inputFieldRef == -1 {
+			return fmt.Errorf("argument template %q is invalid: input field %q does not exist on %q", "args."+strings.Join(argumentPath, "."), fieldName, definition.InputObjectTypeDefinitionNameString(inputObjectNode.Ref))
+		}
+		currentTypeRef = definition.InputValueDefinitions[inputFieldRef].Type
+		currentTypeDoc = definition
+	}
+
+	if err := ensureLeafType(currentTypeRef, currentTypeDoc, definition); err != nil {
+		return fmt.Errorf("argument template %q is invalid: %w", "args."+strings.Join(argumentPath, "."), err)
+	}
+
+	return nil
+}
+
+func inputObjectNodeForTypeRef(typeRef int, typeDoc, definition *ast.Document) (ast.Node, error) {
+	typeName := typeDoc.ResolveTypeNameString(typeRef)
+	if node, ok := definition.Index.FirstNodeByNameStr(typeName); ok && node.Kind == ast.NodeKindInputObjectTypeDefinition {
+		return node, nil
+	}
+	if node, ok := definition.Index.FirstNodeByNameStr(typeName); ok {
+		return ast.Node{}, fmt.Errorf("type %q is %q, expected INPUT_OBJECT", typeName, node.Kind)
+	}
+	return ast.Node{}, fmt.Errorf("type %q does not exist in the schema", typeName)
+}
+
+func ensureLeafType(typeRef int, typeDoc, definition *ast.Document) error {
+	typeName := typeDoc.ResolveTypeNameString(typeRef)
+	node, ok := definition.Index.FirstNodeByNameStr(typeName)
+	if !ok {
+		return fmt.Errorf("type %q does not exist in the schema", typeName)
+	}
+
+	switch node.Kind {
+	case ast.NodeKindScalarTypeDefinition, ast.NodeKindEnumTypeDefinition:
+		return nil
+	case ast.NodeKindInputObjectTypeDefinition:
+		return fmt.Errorf("type %q is an INPUT_OBJECT, expected a leaf type", typeName)
+	default:
+		return fmt.Errorf("type %q is %q, expected a leaf input type", typeName, node.Kind)
+	}
+}
diff --git a/app/v2/pkg/engine/argumenttemplate/argument_template_test.go b/app/v2/pkg/engine/argumenttemplate/argument_template_test.go
new file mode 100644
index 0000000..da35390
--- /dev/null
+++ b/app/v2/pkg/engine/argumenttemplate/argument_template_test.go
@@ -0,0 +1,84 @@
+package argumenttemplate
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+	"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
+	"github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform"
+	"github.com/wundergraph/graphql-go-tools/v2/pkg/internal/unsafeparser"
+)
+
+func TestParse(t *testing.T) {
+	definition := unsafeparser.ParseGraphqlDocumentString(`
+		schema { subscription: Subscription }
+		type Subscription {
+			heroByID(id: ID!): Hero
+			heroByIDInput(input: Input!): Hero
+		}
+		input Input {
+			nested: NestedInput!
+			leaf: String!
+		}
+		input NestedInput {
+			value: ID!
+			sub: DeepInput!
+		}
+		input DeepInput {
+			foo: String!
+		}
+		type Hero {
+			id: ID!
+		}
+	`)
+	require.NoError(t, asttransform.MergeDefinitionWithBaseSchema(&definition))
+
+	operation := unsafeparser.ParseGraphqlDocumentString(`
+		subscription Test($id: ID!, $input: Input!) {
+			heroByID(id: $id) { id }
+			heroByIDInput(input: $input) { id }
+		}
+	`)
+
+	t.Run("valid nested argument path", func(t *testing.T) {
+		fieldRef := findFieldRefByName(t, &operation, "heroByIDInput")
+		template, err := Parse("{{ args.input.nested.value }}", fieldRef, 0, &operation, &definition)
+		require.NoError(t, err)
+		require.Len(t, template.Segments, 1)
+		assert.Equal(t, []string{"input", "nested", "value"}, template.Segments[0].VariableSourcePath)
+	})
+
+	t.Run("invalid nested argument path", func(t *testing.T) {
+		fieldRef := findFieldRefByName(t, &operation, "heroByIDInput")
+		_, err := Parse("{{ args.input.missing.value }}", fieldRef, 0, &operation, &definition)
+		require.Error(t, err)
+		assert.Contains(t, err.Error(), `input field "missing" does not exist on "Input"`)
+	})
+
+	t.Run("invalid non leaf argument path", func(t *testing.T) {
+		fieldRef := findFieldRefByName(t, &operation, "heroByIDInput")
+		_, err := Parse("{{ args.input.nested }}", fieldRef, 0, &operation, &definition)
+		require.Error(t, err)
+		assert.Contains(t, err.Error(), `type "NestedInput" is an INPUT_OBJECT, expected a leaf type`)
+	})
+}
+
+func TestValidateNATSSubject(t *testing.T) {
+	assert.NoError(t, ValidateNATSSubject("foo.bar"))
+	assert.NoError(t, ValidateNATSSubject("foo.*"))
+	assert.ErrorContains(t, ValidateNATSSubject("foo..bar"), "empty tokens")
+	assert.ErrorContains(t, ValidateNATSSubject("foo.>.bar"), `must be the last token`)
+	assert.ErrorContains(t, ValidateNATSSubject("foo.b*ar"), "partial wildcard")
+}
+
+func findFieldRefByName(t *testing.T, operation *ast.Document, fieldName string) int {
+	t.Helper()
+	for i := range operation.Fields {
+		if operation.FieldNameString(i) == fieldName {
+			return i
+		}
+	}
+	t.Fatalf("field %q not found", fieldName)
+	return -1
+}
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go b/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
index 1cb2914..4391791 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
+++ b/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
@@ -3,7 +3,9 @@ package pubsub_datasource
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
+	"strings"
+
+	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/argumenttemplate"
 	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
 	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
 	"slices"
@@ -35,37 +37,34 @@ type NatsEventManager struct {
 }
 
 func (p *NatsEventManager) extractEventSubject(ref int, subject string) (string, error) {
-	matches := eventSubjectRegex.FindAllStringSubmatch(subject, -1)
-	if len(matches) != 1 || len(matches[0]) != 2 {
-		return "", fmt.Errorf("expected subject to match regex")
-	}
-	argumentName := matches[0][1]
-	// We need to find the argument in the operation
-	argumentRef, ok := p.visitor.Operation.FieldArgument(ref, []byte(argumentName))
-	if !ok {
-		return "", fmt.Errorf("argument \"%s\" is not defined", argumentName)
-	}
-	argumentValue := p.visitor.Operation.ArgumentValue(argumentRef)
-	if argumentValue.Kind != ast.ValueKindVariable {
-		return "", fmt.Errorf("expected argument \"%s\" kind to be \"ValueKindVariable\" but received \"%s\"", argumentName, argumentValue.Kind)
-	}
-	variableName := p.visitor.Operation.VariableValueNameBytes(argumentValue.Ref)
-	variableDefinition, ok := p.visitor.Operation.VariableDefinitionByNameAndOperation(p.visitor.Walker.Ancestors[0].Ref, variableName)
-	if !ok {
-		return "", fmt.Errorf("expected definition to exist for variable \"%s\"", variableName)
-	}
-	variableTypeRef := p.visitor.Operation.VariableDefinitions[variableDefinition].Type
-	renderer, err := resolve.NewPlainVariableRendererWithValidationFromTypeRef(p.visitor.Operation, p.visitor.Definition, variableTypeRef, string(variableName))
+	inputTemplate, err := argumenttemplate.Parse(subject, ref, p.visitor.Walker.Ancestors[0].Ref, p.visitor.Operation, p.visitor.Definition)
 	if err != nil {
 		return "", err
 	}
-	contextVariable := &resolve.ContextVariable{
-		Path:     []string{string(variableName)},
-		Renderer: renderer,
+
+	var subjectBuilder strings.Builder
+	for _, segment := range inputTemplate.Segments {
+		switch segment.SegmentType {
+		case resolve.StaticSegmentType:
+			subjectBuilder.Write(segment.Data)
+		case resolve.VariableSegmentType:
+			contextVariable := &resolve.ContextVariable{
+				Path:     segment.VariableSourcePath,
+				Renderer: segment.Renderer,
+			}
+			variablePlaceholder, _ := p.variables.AddVariable(contextVariable)
+			subjectBuilder.WriteString(variablePlaceholder)
+		default:
+			return "", fmt.Errorf("unsupported template segment type: %d", segment.SegmentType)
+		}
 	}
-	// We need to replace the template literal with the variable placeholder (and reuse if it already exists)
-	variablePlaceHolder, _ := p.variables.AddVariable(contextVariable) // $$0$$
-	return eventSubjectRegex.ReplaceAllLiteralString(subject, variablePlaceHolder), nil
+
+	extractedSubject := subjectBuilder.String()
+	if err := argumenttemplate.ValidateNATSSubject(strings.ReplaceAll(extractedSubject, "$$", "x")); err != nil {
+		return "", fmt.Errorf("invalid NATS subject %q: %w", subject, err)
+	}
+
+	return extractedSubject, nil
 }
 
 func (p *NatsEventManager) eventDataBytes(ref int) ([]byte, error) {
@@ -77,15 +76,15 @@ func (p *NatsEventManager) handlePublishAndRequestEvent(ref int) {
 		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("publish and request events should define one subject but received %d", len(p.eventConfiguration.Subjects)))
 		return
 	}
-	rawSubject := p.eventConfiguration.Subjects[0]
-	extractedSubject, err := p.extractEventSubject(ref, rawSubject)
+	dataBytes, err := p.eventDataBytes(ref)
 	if err != nil {
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("could not extract event subject: %w", err))
+		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to write event data bytes: %w", err))
 		return
 	}
-	dataBytes, err := p.eventDataBytes(ref)
+	rawSubject := p.eventConfiguration.Subjects[0]
+	extractedSubject, err := p.extractEventSubject(ref, rawSubject)
 	if err != nil {
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to write event data bytes: %w", err))
+		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("could not extract event subject: %w", err))
 		return
 	}
 
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
index 02b05d3..4c62615 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
+++ b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
@@ -5,7 +5,6 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
-	"regexp"
 	"strings"
 
 	"github.com/jensneuse/abstractlogger"
@@ -23,8 +22,6 @@ const (
 	EventTypeSubscribe EventType = "subscribe"
 )
 
-var eventSubjectRegex = regexp.MustCompile(`{{ args.([a-zA-Z0-9_]+) }}`)
-
 func EventTypeFromString(s string) (EventType, error) {
 	et := EventType(strings.ToLower(s))
 	switch et {
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
index 2c817b1..b11c8f7 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
+++ b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
@@ -41,9 +41,15 @@ func TestPubSub(t *testing.T) {
 		helloMutation(id: String!, input: String!): String! @eventsPublish(subject: "helloMutation.{{ args.id }}")
 	}
 
+	input CompoundInput {
+		firstId: String!
+	}
+
 	type Subscription {
 		helloSubscription(id: String!): String! @eventsSubscribe(subjects: ["helloSubscription.{{ args.id }}"])
 		subscriptionWithMultipleSubjects(firstId: String!, secondId: String!): String! @eventsSubscribe(subjects: ["firstSubscription.{{ args.firstId }}", "secondSubscription.{{ args.secondId }}"])
+		subscriptionWithCompoundSubject(firstId: String!, secondId: String!): String! @eventsSubscribe(subjects: ["compound.{{ args.firstId }}.{{ args.secondId }}"])
+		subscriptionWithNestedSubject(input: CompoundInput!): String! @eventsSubscribe(subjects: ["nested.{{ args.input.firstId }}"])
 	}`
 
 	dataSourceCustomConfig := Configuration{
@@ -92,6 +98,28 @@ func TestPubSub(t *testing.T) {
 					Subjects: []string{"firstSubscription.{{ args.firstId }}", "secondSubscription.{{ args.secondId }}"},
 				},
 			},
+			{
+				Metadata: &EventMetadata{
+					ProviderID: "default",
+					FieldName:  "subscriptionWithCompoundSubject",
+					Type:       EventTypeSubscribe,
+					TypeName:   "Subscription",
+				},
+				Configuration: &NatsEventConfiguration{
+					Subjects: []string{"compound.{{ args.firstId }}.{{ args.secondId }}"},
+				},
+			},
+			{
+				Metadata: &EventMetadata{
+					ProviderID: "default",
+					FieldName:  "subscriptionWithNestedSubject",
+					Type:       EventTypeSubscribe,
+					TypeName:   "Subscription",
+				},
+				Configuration: &NatsEventConfiguration{
+					Subjects: []string{"nested.{{ args.input.firstId }}"},
+				},
+			},
 		},
 	}
 
@@ -116,6 +144,10 @@ func TestPubSub(t *testing.T) {
 					TypeName:   "Subscription",
 					FieldNames: []string{"subscriptionWithMultipleSubjects"},
 				},
+				{
+					TypeName:   "Subscription",
+					FieldNames: []string{"subscriptionWithCompoundSubject", "subscriptionWithNestedSubject"},
+				},
 			},
 		},
 		dataSourceCustomConfig,
@@ -171,6 +203,30 @@ func TestPubSub(t *testing.T) {
 					},
 				},
 			},
+			{
+				TypeName:  "Subscription",
+				FieldName: "subscriptionWithCompoundSubject",
+				Arguments: []plan.ArgumentConfiguration{
+					{
+						Name:       "firstId",
+						SourceType: plan.FieldArgumentSource,
+					},
+					{
+						Name:       "secondId",
+						SourceType: plan.FieldArgumentSource,
+					},
+				},
+			},
+			{
+				TypeName:  "Subscription",
+				FieldName: "subscriptionWithNestedSubject",
+				Arguments: []plan.ArgumentConfiguration{
+					{
+						Name:       "input",
+						SourceType: plan.FieldArgumentSource,
+					},
+				},
+			},
 		},
 		DisableResolveFieldPositions: true,
 	}
@@ -265,7 +321,7 @@ func TestPubSub(t *testing.T) {
 					Variables: resolve.Variables{
 						&resolve.ContextVariable{
 							Path:     []string{"a"},
-							Renderer: resolve.NewPlainVariableRendererWithValidation(`{"type":["string"]}`),
+							Renderer: resolve.NewPlainVariableRenderer(),
 						},
 					},
 					Source: &NatsSubscriptionSource{
@@ -302,11 +358,11 @@ func TestPubSub(t *testing.T) {
 					Variables: resolve.Variables{
 						&resolve.ContextVariable{
 							Path:     []string{"a"},
-							Renderer: resolve.NewPlainVariableRendererWithValidation(`{"type":["string"]}`),
+							Renderer: resolve.NewPlainVariableRenderer(),
 						},
 						&resolve.ContextVariable{
 							Path:     []string{"b"},
-							Renderer: resolve.NewPlainVariableRendererWithValidation(`{"type":["string"]}`),
+							Renderer: resolve.NewPlainVariableRenderer(),
 						},
 					},
 					Source: &NatsSubscriptionSource{
@@ -332,4 +388,82 @@ func TestPubSub(t *testing.T) {
 		}
 		datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
 	})
+
+	t.Run("subscription with compound subject", func(t *testing.T) {
+		const operation = "subscription SubscriptionWithCompoundSubject { subscriptionWithCompoundSubject(firstId: 11, secondId: 23) }"
+		const operationName = `SubscriptionWithCompoundSubject`
+		expect := &plan.SubscriptionResponsePlan{
+			Response: &resolve.GraphQLSubscription{
+				Trigger: resolve.GraphQLSubscriptionTrigger{
+					Input: []byte(`{"providerId":"default","subjects":["compound.$$0$$.$$1$$"]}`),
+					Variables: resolve.Variables{
+						&resolve.ContextVariable{
+							Path:     []string{"a"},
+							Renderer: resolve.NewPlainVariableRenderer(),
+						},
+						&resolve.ContextVariable{
+							Path:     []string{"b"},
+							Renderer: resolve.NewPlainVariableRenderer(),
+						},
+					},
+					Source: &NatsSubscriptionSource{
+						pubSub: &testPubsub{},
+					},
+					PostProcessing: resolve.PostProcessingConfiguration{
+						MergePath: []string{"subscriptionWithCompoundSubject"},
+					},
+				},
+				Response: &resolve.GraphQLResponse{
+					Data: &resolve.Object{
+						Fields: []*resolve.Field{
+							{
+								Name: []byte("subscriptionWithCompoundSubject"),
+								Value: &resolve.String{
+									Path: []string{"subscriptionWithCompoundSubject"},
+								},
+							},
+						},
+					},
+				},
+			},
+		}
+		datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
+	})
+
+	t.Run("subscription with nested subject", func(t *testing.T) {
+		const operation = "subscription SubscriptionWithNestedSubject { subscriptionWithNestedSubject(input: { firstId: \"11\" }) }"
+		const operationName = `SubscriptionWithNestedSubject`
+		expect := &plan.SubscriptionResponsePlan{
+			Response: &resolve.GraphQLSubscription{
+				Trigger: resolve.GraphQLSubscriptionTrigger{
+					Input: []byte(`{"providerId":"default","subjects":["nested.$$0$$"]}`),
+					Variables: resolve.Variables{
+						&resolve.ContextVariable{
+							Path:     []string{"a", "firstId"},
+							Renderer: resolve.NewPlainVariableRenderer(),
+						},
+					},
+					Source: &NatsSubscriptionSource{
+						pubSub: &testPubsub{},
+					},
+					PostProcessing: resolve.PostProcessingConfiguration{
+						MergePath: []string{"subscriptionWithNestedSubject"},
+					},
+				},
+				Response: &resolve.GraphQLResponse{
+					Data: &resolve.Object{
+						Fields: []*resolve.Field{
+							{
+								Name: []byte("subscriptionWithNestedSubject"),
+								Value: &resolve.String{
+									Path: []string{"subscriptionWithNestedSubject"},
+								},
+							},
+						},
+					},
+				},
+			},
+		}
+		datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
+	})
 }
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/plan/configuration_visitor.go b/app/v2/pkg/engine/plan/configuration_visitor.go
index b5e2ed1..56384c3 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/plan/configuration_visitor.go
+++ b/app/v2/pkg/engine/plan/configuration_visitor.go
@@ -3,7 +3,6 @@ package plan
 import (
 	"bytes"
 	"fmt"
-	"regexp"
 	"slices"
 	"strings"
 
@@ -12,6 +11,7 @@ import (
 
 	"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
 	"github.com/wundergraph/graphql-go-tools/v2/pkg/astvisitor"
+	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/argumenttemplate"
 	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
 )
 
@@ -870,13 +870,6 @@ func (c *configurationVisitor) buildSubscriptionFilterCondition(condition Subscr
 	return filter
 }
 
-var (
-	// subscriptionFieldFilterRegex is used to extract the variable name from the subscription filter condition
-	// e.g. {{ args.id }} -> id
-	// e.g. {{ args.input.id }} -> input.id
-	subscriptionFieldFilterRegex = regexp.MustCompile(`{{\s*args((?:\.[a-zA-Z0-9_]+)+)\s*}}`)
-)
-
 // ContainsTemplateString checks if the value contains a template string
 func ContainsTemplateString(value []byte) bool {
 	return bytes.Contains(value, []byte("{{"))
@@ -887,68 +880,17 @@ func (c *configurationVisitor) buildSubscriptionFieldFilter(condition *Subscript
 	filter.FieldPath = condition.FieldPath
 	filter.Values = make([]resolve.InputTemplate, len(condition.Values))
 	for i, value := range condition.Values {
-		matches := subscriptionFieldFilterRegex.FindAllStringSubmatchIndex(value, -1)
-		if len(matches) == 0 {
-			filter.Values[i].Segments = []resolve.TemplateSegment{
-				{
-					SegmentType: resolve.StaticSegmentType,
-					Data:        []byte(value),
-				},
-			}
-			continue
+		inputTemplate, err := argumenttemplate.Parse(value, c.fieldRef, c.walker.Ancestors[0].Ref, c.operation, c.definition)
+		if err != nil {
+			c.walker.StopWithInternalErr(err)
+			return nil
 		}
-		if len(matches) == 1 && len(matches[0]) == 4 {
-			prefix := value[:matches[0][0]]
-			hasPrefix := len(prefix) > 0
-			// the path begins with ".", so ignore the first empty string element with trailing [1:]
-			argumentPath := strings.Split(value[matches[0][2]:matches[0][3]][1:], ".")
-			argumentName := argumentPath[0]
-			argumentRef, ok := c.operation.FieldArgument(c.fieldRef, []byte(argumentName))
-			if !ok {
-				c.walker.StopWithInternalErr(fmt.Errorf(`field argument "%s" is not defined`, argumentName))
-				return nil
-			}
-			argumentValue := c.operation.ArgumentValue(argumentRef)
-			if argumentValue.Kind != ast.ValueKindVariable {
-				c.walker.StopWithInternalErr(fmt.Errorf(`expected argument "%s" kind to be "ValueKindVariable" but received "%s"`, argumentName, argumentValue.Kind))
-				return nil
-			}
-			variableName := c.operation.VariableValueNameString(argumentValue.Ref)
-			// the variable path should be the variable name, e.g., "a", and then the 2nd element from the path onwards
-			variablePath := append([]string{variableName}, argumentPath[1:]...)
-			suffix := value[matches[0][1]:]
-			hasSuffix := len(suffix) > 0
-			size := 1
-			if hasPrefix {
-				size++
-			}
-			if hasSuffix {
-				size++
+		for j := range inputTemplate.Segments {
+			if inputTemplate.Segments[j].SegmentType == resolve.VariableSegmentType {
+				inputTemplate.Segments[j].Renderer = resolve.NewPlainVariableRenderer()
 			}
-			filter.Values[i].Segments = make([]resolve.TemplateSegment, size)
-			idx := 0
-			if hasPrefix {
-				filter.Values[i].Segments[idx] = resolve.TemplateSegment{
-					SegmentType: resolve.StaticSegmentType,
-					Data:        []byte(prefix),
-				}
-				idx++
-			}
-			filter.Values[i].Segments[idx] = resolve.TemplateSegment{
-				SegmentType:        resolve.VariableSegmentType,
-				VariableKind:       resolve.ContextVariableKind,
-				Renderer:           resolve.NewPlainVariableRenderer(),
-				VariableSourcePath: variablePath,
-			}
-			if hasSuffix {
-				filter.Values[i].Segments[idx+1] = resolve.TemplateSegment{
-					SegmentType: resolve.StaticSegmentType,
-					Data:        []byte(suffix),
-				}
-			}
-			continue
 		}
-		return nil
+		filter.Values[i] = inputTemplate
 	}
 	return filter
 }
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/plan/subscription_filter_test.go b/app/v2/pkg/engine/plan/subscription_filter_test.go
index d6c51d9..d6eed77 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/plan/subscription_filter_test.go
+++ b/app/v2/pkg/engine/plan/subscription_filter_test.go
@@ -262,22 +262,49 @@ func TestPlanSubscriptionFilter(t *testing.T) {
 		},
 	))
 
-	t.Run("subscription with IN field invalid filter multiple templates", test(
+	t.Run("subscription with IN field filter multiple templates", test(
 		schema, `
-				subscription { heroByID(id: "1") { id name } }
+				subscription { heroByIDMultipleArgs(one: 1, two: 2.5) { id name } }
 			`, "",
 		&SubscriptionResponsePlan{
 			Response: &resolve.GraphQLSubscription{
 				Trigger: resolve.GraphQLSubscriptionTrigger{
 					Input: []byte{},
 				},
+				Filter: &resolve.SubscriptionFilter{
+					In: &resolve.SubscriptionFieldFilter{
+						FieldPath: []string{"id"},
+						Values: []resolve.InputTemplate{
+							{
+								Segments: []resolve.TemplateSegment{
+									{
+										SegmentType:        resolve.VariableSegmentType,
+										VariableKind:       resolve.ContextVariableKind,
+										VariableSourcePath: []string{"a"},
+										Renderer:           resolve.NewPlainVariableRenderer(),
+									},
+									{
+										SegmentType: resolve.StaticSegmentType,
+										Data:        []byte("."),
+									},
+									{
+										SegmentType:        resolve.VariableSegmentType,
+										VariableKind:       resolve.ContextVariableKind,
+										VariableSourcePath: []string{"b"},
+										Renderer:           resolve.NewPlainVariableRenderer(),
+									},
+								},
+							},
+						},
+					},
+				},
 				Response: &resolve.GraphQLResponse{
 					Data: &resolve.Object{
 						Fields: []*resolve.Field{
 							{
-								Name: []byte("heroByID"),
+								Name: []byte("heroByIDMultipleArgs"),
 								Value: &resolve.Object{
-									Path:     []string{"heroByID"},
+									Path:     []string{"heroByIDMultipleArgs"},
 									Nullable: true,
 									Fields: []*resolve.Field{
 										{
@@ -306,19 +333,24 @@ func TestPlanSubscriptionFilter(t *testing.T) {
 			Fields: []FieldConfiguration{
 				{
 					TypeName:  "Subscription",
-					FieldName: "heroByID",
-					Path:      []string{"heroByID"},
+					FieldName: "heroByIDMultipleArgs",
+					Path:      []string{"heroByIDMultipleArgs"},
 					Arguments: []ArgumentConfiguration{
 						{
-							Name:       "id",
+							Name:       "one",
 							SourceType: FieldArgumentSource,
-							SourcePath: []string{"id"},
+							SourcePath: []string{"one"},
+						},
+						{
+							Name:       "two",
+							SourceType: FieldArgumentSource,
+							SourcePath: []string{"two"},
 						},
 					},
 					SubscriptionFilterCondition: &SubscriptionFilterCondition{
 						In: &SubscriptionFieldCondition{
 							FieldPath: []string{"id"},
-							Values:    []string{"{{ args.a }}.{{ args.b }}"},
+							Values:    []string{"{{ args.one }}.{{ args.two }}"},
 						},
 					},
 				},