agent_patch
runs/2026-02-28__13-38-10__gpt-5-4/flux-pr-817/flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/agent-logs/agent.patch
29938 bytes
diff --git a/app/v2/pkg/engine/argumenttemplate/argument_template.go b/app/v2/pkg/engine/argumenttemplate/argument_template.go
new file mode 100644
index 0000000..8c9de81
--- /dev/null
+++ b/app/v2/pkg/engine/argumenttemplate/argument_template.go
@@ -0,0 +1,172 @@
+package argumenttemplate
+
+import (
+ "fmt"
+ "regexp"
+ "strings"
+ "unicode"
+
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
+)
+
+var templateRegex = regexp.MustCompile(`{{\s*args((?:\.[a-zA-Z0-9_]+)+)\s*}}`)
+
+func Parse(template string, fieldRef, operationDefinitionRef int, operation, definition *ast.Document) (resolve.InputTemplate, error) {
+ inputTemplate := resolve.InputTemplate{}
+ matches := templateRegex.FindAllStringSubmatchIndex(template, -1)
+ if len(matches) == 0 {
+ inputTemplate.Segments = []resolve.TemplateSegment{{
+ SegmentType: resolve.StaticSegmentType,
+ Data: []byte(template),
+ }}
+ return inputTemplate, nil
+ }
+
+ segments := make([]resolve.TemplateSegment, 0, len(matches)*2+1)
+ lastEnd := 0
+
+ for _, match := range matches {
+ if len(match) != 4 {
+ return resolve.InputTemplate{}, fmt.Errorf("invalid argument template")
+ }
+ if match[0] > lastEnd {
+ segments = append(segments, resolve.TemplateSegment{
+ SegmentType: resolve.StaticSegmentType,
+ Data: []byte(template[lastEnd:match[0]]),
+ })
+ }
+
+ argumentPath := strings.Split(template[match[2]:match[3]][1:], ".")
+ segment, err := resolveArgumentTemplateSegment(argumentPath, fieldRef, operationDefinitionRef, operation, definition)
+ if err != nil {
+ return resolve.InputTemplate{}, err
+ }
+ segments = append(segments, segment)
+ lastEnd = match[1]
+ }
+
+ if lastEnd < len(template) {
+ segments = append(segments, resolve.TemplateSegment{
+ SegmentType: resolve.StaticSegmentType,
+ Data: []byte(template[lastEnd:]),
+ })
+ }
+
+ inputTemplate.Segments = segments
+ return inputTemplate, nil
+}
+
+func ValidateNATSSubject(subject string) error {
+ if subject == "" {
+ return fmt.Errorf("subject must not be empty")
+ }
+ if strings.IndexFunc(subject, unicode.IsSpace) != -1 {
+ return fmt.Errorf("subject must not contain whitespace")
+ }
+
+ tokens := strings.Split(subject, ".")
+ for i, token := range tokens {
+ if token == "" {
+ return fmt.Errorf("subject must not contain empty tokens")
+ }
+ if strings.Contains(token, ">") && token != ">" {
+ return fmt.Errorf("token %q must not contain partial wildcard '>'", token)
+ }
+ if strings.Contains(token, "*") && token != "*" {
+ return fmt.Errorf("token %q must not contain partial wildcard '*'", token)
+ }
+ if token == ">" && i != len(tokens)-1 {
+ return fmt.Errorf("token \">\" must be the last token")
+ }
+ }
+
+ return nil
+}
+
+func resolveArgumentTemplateSegment(argumentPath []string, fieldRef, operationDefinitionRef int, operation, definition *ast.Document) (resolve.TemplateSegment, error) {
+ argumentName := argumentPath[0]
+ argumentRef, ok := operation.FieldArgument(fieldRef, []byte(argumentName))
+ if !ok {
+ return resolve.TemplateSegment{}, fmt.Errorf("field argument %q is not defined", argumentName)
+ }
+
+ argumentValue := operation.ArgumentValue(argumentRef)
+ if argumentValue.Kind != ast.ValueKindVariable {
+ return resolve.TemplateSegment{}, fmt.Errorf("expected argument %q kind to be %q but received %q", argumentName, ast.ValueKindVariable, argumentValue.Kind)
+ }
+
+ variableName := operation.VariableValueNameString(argumentValue.Ref)
+ variableDefinition, ok := operation.VariableDefinitionByNameAndOperation(operationDefinitionRef, operation.VariableValueNameBytes(argumentValue.Ref))
+ if !ok {
+ return resolve.TemplateSegment{}, fmt.Errorf("expected definition to exist for variable %q", variableName)
+ }
+
+ variableTypeRef := operation.VariableDefinitions[variableDefinition].Type
+ if err := validateArgumentPath(argumentPath, variableTypeRef, operation, definition); err != nil {
+ return resolve.TemplateSegment{}, err
+ }
+
+ variablePath := append([]string{variableName}, argumentPath[1:]...)
+
+ return resolve.TemplateSegment{
+ SegmentType: resolve.VariableSegmentType,
+ VariableKind: resolve.ContextVariableKind,
+ VariableSourcePath: variablePath,
+ Renderer: resolve.NewPlainVariableRenderer(),
+ }, nil
+}
+
+func validateArgumentPath(argumentPath []string, variableTypeRef int, operation, definition *ast.Document) error {
+ currentTypeRef := variableTypeRef
+ currentTypeDoc := operation
+
+ for i := 1; i < len(argumentPath); i++ {
+ inputObjectNode, err := inputObjectNodeForTypeRef(currentTypeRef, currentTypeDoc, definition)
+ if err != nil {
+ return fmt.Errorf("argument template %q is invalid: %w", "args."+strings.Join(argumentPath, "."), err)
+ }
+
+ fieldName := argumentPath[i]
+ inputFieldRef := definition.InputObjectTypeDefinitionInputValueDefinitionByName(inputObjectNode.Ref, []byte(fieldName))
+ if inputFieldRef == -1 {
+ return fmt.Errorf("argument template %q is invalid: input field %q does not exist on %q", "args."+strings.Join(argumentPath, "."), fieldName, definition.InputObjectTypeDefinitionNameString(inputObjectNode.Ref))
+ }
+ currentTypeRef = definition.InputValueDefinitions[inputFieldRef].Type
+ currentTypeDoc = definition
+ }
+
+ if err := ensureLeafType(currentTypeRef, currentTypeDoc, definition); err != nil {
+ return fmt.Errorf("argument template %q is invalid: %w", "args."+strings.Join(argumentPath, "."), err)
+ }
+
+ return nil
+}
+
+func inputObjectNodeForTypeRef(typeRef int, typeDoc, definition *ast.Document) (ast.Node, error) {
+ typeName := typeDoc.ResolveTypeNameString(typeRef)
+ if node, ok := definition.Index.FirstNodeByNameStr(typeName); ok && node.Kind == ast.NodeKindInputObjectTypeDefinition {
+ return node, nil
+ }
+ if node, ok := definition.Index.FirstNodeByNameStr(typeName); ok {
+ return ast.Node{}, fmt.Errorf("type %q is %q, expected INPUT_OBJECT", typeName, node.Kind)
+ }
+ return ast.Node{}, fmt.Errorf("type %q does not exist in the schema", typeName)
+}
+
+func ensureLeafType(typeRef int, typeDoc, definition *ast.Document) error {
+ typeName := typeDoc.ResolveTypeNameString(typeRef)
+ node, ok := definition.Index.FirstNodeByNameStr(typeName)
+ if !ok {
+ return fmt.Errorf("type %q does not exist in the schema", typeName)
+ }
+
+ switch node.Kind {
+ case ast.NodeKindScalarTypeDefinition, ast.NodeKindEnumTypeDefinition:
+ return nil
+ case ast.NodeKindInputObjectTypeDefinition:
+ return fmt.Errorf("type %q is an INPUT_OBJECT, expected a leaf type", typeName)
+ default:
+ return fmt.Errorf("type %q is %q, expected a leaf input type", typeName, node.Kind)
+ }
+}
diff --git a/app/v2/pkg/engine/argumenttemplate/argument_template_test.go b/app/v2/pkg/engine/argumenttemplate/argument_template_test.go
new file mode 100644
index 0000000..da35390
--- /dev/null
+++ b/app/v2/pkg/engine/argumenttemplate/argument_template_test.go
@@ -0,0 +1,84 @@
+package argumenttemplate
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform"
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/internal/unsafeparser"
+)
+
+func TestParse(t *testing.T) {
+ definition := unsafeparser.ParseGraphqlDocumentString(`
+ schema { subscription: Subscription }
+ type Subscription {
+ heroByID(id: ID!): Hero
+ heroByIDInput(input: Input!): Hero
+ }
+ input Input {
+ nested: NestedInput!
+ leaf: String!
+ }
+ input NestedInput {
+ value: ID!
+ sub: DeepInput!
+ }
+ input DeepInput {
+ foo: String!
+ }
+ type Hero {
+ id: ID!
+ }
+ `)
+ require.NoError(t, asttransform.MergeDefinitionWithBaseSchema(&definition))
+
+ operation := unsafeparser.ParseGraphqlDocumentString(`
+ subscription Test($id: ID!, $input: Input!) {
+ heroByID(id: $id) { id }
+ heroByIDInput(input: $input) { id }
+ }
+ `)
+
+ t.Run("valid nested argument path", func(t *testing.T) {
+ fieldRef := findFieldRefByName(t, &operation, "heroByIDInput")
+ template, err := Parse("{{ args.input.nested.value }}", fieldRef, 0, &operation, &definition)
+ require.NoError(t, err)
+ require.Len(t, template.Segments, 1)
+ assert.Equal(t, []string{"input", "nested", "value"}, template.Segments[0].VariableSourcePath)
+ })
+
+ t.Run("invalid nested argument path", func(t *testing.T) {
+ fieldRef := findFieldRefByName(t, &operation, "heroByIDInput")
+ _, err := Parse("{{ args.input.missing.value }}", fieldRef, 0, &operation, &definition)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), `input field "missing" does not exist on "Input"`)
+ })
+
+ t.Run("invalid non leaf argument path", func(t *testing.T) {
+ fieldRef := findFieldRefByName(t, &operation, "heroByIDInput")
+ _, err := Parse("{{ args.input.nested }}", fieldRef, 0, &operation, &definition)
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), `type "NestedInput" is an INPUT_OBJECT, expected a leaf type`)
+ })
+}
+
+func TestValidateNATSSubject(t *testing.T) {
+ assert.NoError(t, ValidateNATSSubject("foo.bar"))
+ assert.NoError(t, ValidateNATSSubject("foo.*"))
+ assert.ErrorContains(t, ValidateNATSSubject("foo..bar"), "empty tokens")
+ assert.ErrorContains(t, ValidateNATSSubject("foo.>.bar"), `must be the last token`)
+ assert.ErrorContains(t, ValidateNATSSubject("foo.b*ar"), "partial wildcard")
+}
+
+func findFieldRefByName(t *testing.T, operation *ast.Document, fieldName string) int {
+ t.Helper()
+ for i := range operation.Fields {
+ if operation.FieldNameString(i) == fieldName {
+ return i
+ }
+ }
+ t.Fatalf("field %q not found", fieldName)
+ return -1
+}
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go b/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
index 1cb2914..4391791 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
+++ b/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
@@ -3,7 +3,9 @@ package pubsub_datasource
import (
"encoding/json"
"fmt"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
+ "strings"
+
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/argumenttemplate"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"slices"
@@ -35,37 +37,34 @@ type NatsEventManager struct {
}
func (p *NatsEventManager) extractEventSubject(ref int, subject string) (string, error) {
- matches := eventSubjectRegex.FindAllStringSubmatch(subject, -1)
- if len(matches) != 1 || len(matches[0]) != 2 {
- return "", fmt.Errorf("expected subject to match regex")
- }
- argumentName := matches[0][1]
- // We need to find the argument in the operation
- argumentRef, ok := p.visitor.Operation.FieldArgument(ref, []byte(argumentName))
- if !ok {
- return "", fmt.Errorf("argument \"%s\" is not defined", argumentName)
- }
- argumentValue := p.visitor.Operation.ArgumentValue(argumentRef)
- if argumentValue.Kind != ast.ValueKindVariable {
- return "", fmt.Errorf("expected argument \"%s\" kind to be \"ValueKindVariable\" but received \"%s\"", argumentName, argumentValue.Kind)
- }
- variableName := p.visitor.Operation.VariableValueNameBytes(argumentValue.Ref)
- variableDefinition, ok := p.visitor.Operation.VariableDefinitionByNameAndOperation(p.visitor.Walker.Ancestors[0].Ref, variableName)
- if !ok {
- return "", fmt.Errorf("expected definition to exist for variable \"%s\"", variableName)
- }
- variableTypeRef := p.visitor.Operation.VariableDefinitions[variableDefinition].Type
- renderer, err := resolve.NewPlainVariableRendererWithValidationFromTypeRef(p.visitor.Operation, p.visitor.Definition, variableTypeRef, string(variableName))
+ inputTemplate, err := argumenttemplate.Parse(subject, ref, p.visitor.Walker.Ancestors[0].Ref, p.visitor.Operation, p.visitor.Definition)
if err != nil {
return "", err
}
- contextVariable := &resolve.ContextVariable{
- Path: []string{string(variableName)},
- Renderer: renderer,
+
+ var subjectBuilder strings.Builder
+ for _, segment := range inputTemplate.Segments {
+ switch segment.SegmentType {
+ case resolve.StaticSegmentType:
+ subjectBuilder.Write(segment.Data)
+ case resolve.VariableSegmentType:
+ contextVariable := &resolve.ContextVariable{
+ Path: segment.VariableSourcePath,
+ Renderer: segment.Renderer,
+ }
+ variablePlaceholder, _ := p.variables.AddVariable(contextVariable)
+ subjectBuilder.WriteString(variablePlaceholder)
+ default:
+ return "", fmt.Errorf("unsupported template segment type: %d", segment.SegmentType)
+ }
}
- // We need to replace the template literal with the variable placeholder (and reuse if it already exists)
- variablePlaceHolder, _ := p.variables.AddVariable(contextVariable) // $$0$$
- return eventSubjectRegex.ReplaceAllLiteralString(subject, variablePlaceHolder), nil
+
+ extractedSubject := subjectBuilder.String()
+ if err := argumenttemplate.ValidateNATSSubject(strings.ReplaceAll(extractedSubject, "$$", "x")); err != nil {
+ return "", fmt.Errorf("invalid NATS subject %q: %w", subject, err)
+ }
+
+ return extractedSubject, nil
}
func (p *NatsEventManager) eventDataBytes(ref int) ([]byte, error) {
@@ -77,15 +76,15 @@ func (p *NatsEventManager) handlePublishAndRequestEvent(ref int) {
p.visitor.Walker.StopWithInternalErr(fmt.Errorf("publish and request events should define one subject but received %d", len(p.eventConfiguration.Subjects)))
return
}
- rawSubject := p.eventConfiguration.Subjects[0]
- extractedSubject, err := p.extractEventSubject(ref, rawSubject)
+ dataBytes, err := p.eventDataBytes(ref)
if err != nil {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("could not extract event subject: %w", err))
+ p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to write event data bytes: %w", err))
return
}
- dataBytes, err := p.eventDataBytes(ref)
+ rawSubject := p.eventConfiguration.Subjects[0]
+ extractedSubject, err := p.extractEventSubject(ref, rawSubject)
if err != nil {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to write event data bytes: %w", err))
+ p.visitor.Walker.StopWithInternalErr(fmt.Errorf("could not extract event subject: %w", err))
return
}
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
index 02b05d3..4c62615 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
+++ b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
@@ -5,7 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
- "regexp"
"strings"
"github.com/jensneuse/abstractlogger"
@@ -23,8 +22,6 @@ const (
EventTypeSubscribe EventType = "subscribe"
)
-var eventSubjectRegex = regexp.MustCompile(`{{ args.([a-zA-Z0-9_]+) }}`)
-
func EventTypeFromString(s string) (EventType, error) {
et := EventType(strings.ToLower(s))
switch et {
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
index 2c817b1..b11c8f7 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
+++ b/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
@@ -41,9 +41,15 @@ func TestPubSub(t *testing.T) {
helloMutation(id: String!, input: String!): String! @eventsPublish(subject: "helloMutation.{{ args.id }}")
}
+ input CompoundInput {
+ firstId: String!
+ }
+
type Subscription {
helloSubscription(id: String!): String! @eventsSubscribe(subjects: ["helloSubscription.{{ args.id }}"])
subscriptionWithMultipleSubjects(firstId: String!, secondId: String!): String! @eventsSubscribe(subjects: ["firstSubscription.{{ args.firstId }}", "secondSubscription.{{ args.secondId }}"])
+ subscriptionWithCompoundSubject(firstId: String!, secondId: String!): String! @eventsSubscribe(subjects: ["compound.{{ args.firstId }}.{{ args.secondId }}"])
+ subscriptionWithNestedSubject(input: CompoundInput!): String! @eventsSubscribe(subjects: ["nested.{{ args.input.firstId }}"])
}`
dataSourceCustomConfig := Configuration{
@@ -92,6 +98,28 @@ func TestPubSub(t *testing.T) {
Subjects: []string{"firstSubscription.{{ args.firstId }}", "secondSubscription.{{ args.secondId }}"},
},
},
+ {
+ Metadata: &EventMetadata{
+ ProviderID: "default",
+ FieldName: "subscriptionWithCompoundSubject",
+ Type: EventTypeSubscribe,
+ TypeName: "Subscription",
+ },
+ Configuration: &NatsEventConfiguration{
+ Subjects: []string{"compound.{{ args.firstId }}.{{ args.secondId }}"},
+ },
+ },
+ {
+ Metadata: &EventMetadata{
+ ProviderID: "default",
+ FieldName: "subscriptionWithNestedSubject",
+ Type: EventTypeSubscribe,
+ TypeName: "Subscription",
+ },
+ Configuration: &NatsEventConfiguration{
+ Subjects: []string{"nested.{{ args.input.firstId }}"},
+ },
+ },
},
}
@@ -116,6 +144,10 @@ func TestPubSub(t *testing.T) {
TypeName: "Subscription",
FieldNames: []string{"subscriptionWithMultipleSubjects"},
},
+ {
+ TypeName: "Subscription",
+ FieldNames: []string{"subscriptionWithCompoundSubject", "subscriptionWithNestedSubject"},
+ },
},
},
dataSourceCustomConfig,
@@ -171,6 +203,30 @@ func TestPubSub(t *testing.T) {
},
},
},
+ {
+ TypeName: "Subscription",
+ FieldName: "subscriptionWithCompoundSubject",
+ Arguments: []plan.ArgumentConfiguration{
+ {
+ Name: "firstId",
+ SourceType: plan.FieldArgumentSource,
+ },
+ {
+ Name: "secondId",
+ SourceType: plan.FieldArgumentSource,
+ },
+ },
+ },
+ {
+ TypeName: "Subscription",
+ FieldName: "subscriptionWithNestedSubject",
+ Arguments: []plan.ArgumentConfiguration{
+ {
+ Name: "input",
+ SourceType: plan.FieldArgumentSource,
+ },
+ },
+ },
},
DisableResolveFieldPositions: true,
}
@@ -265,7 +321,7 @@ func TestPubSub(t *testing.T) {
Variables: resolve.Variables{
&resolve.ContextVariable{
Path: []string{"a"},
- Renderer: resolve.NewPlainVariableRendererWithValidation(`{"type":["string"]}`),
+ Renderer: resolve.NewPlainVariableRenderer(),
},
},
Source: &NatsSubscriptionSource{
@@ -302,11 +358,11 @@ func TestPubSub(t *testing.T) {
Variables: resolve.Variables{
&resolve.ContextVariable{
Path: []string{"a"},
- Renderer: resolve.NewPlainVariableRendererWithValidation(`{"type":["string"]}`),
+ Renderer: resolve.NewPlainVariableRenderer(),
},
&resolve.ContextVariable{
Path: []string{"b"},
- Renderer: resolve.NewPlainVariableRendererWithValidation(`{"type":["string"]}`),
+ Renderer: resolve.NewPlainVariableRenderer(),
},
},
Source: &NatsSubscriptionSource{
@@ -332,4 +388,82 @@ func TestPubSub(t *testing.T) {
}
datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
})
+
+ t.Run("subscription with compound subject", func(t *testing.T) {
+ const operation = "subscription SubscriptionWithCompoundSubject { subscriptionWithCompoundSubject(firstId: 11, secondId: 23) }"
+ const operationName = `SubscriptionWithCompoundSubject`
+ expect := &plan.SubscriptionResponsePlan{
+ Response: &resolve.GraphQLSubscription{
+ Trigger: resolve.GraphQLSubscriptionTrigger{
+ Input: []byte(`{"providerId":"default","subjects":["compound.$$0$$.$$1$$"]}`),
+ Variables: resolve.Variables{
+ &resolve.ContextVariable{
+ Path: []string{"a"},
+ Renderer: resolve.NewPlainVariableRenderer(),
+ },
+ &resolve.ContextVariable{
+ Path: []string{"b"},
+ Renderer: resolve.NewPlainVariableRenderer(),
+ },
+ },
+ Source: &NatsSubscriptionSource{
+ pubSub: &testPubsub{},
+ },
+ PostProcessing: resolve.PostProcessingConfiguration{
+ MergePath: []string{"subscriptionWithCompoundSubject"},
+ },
+ },
+ Response: &resolve.GraphQLResponse{
+ Data: &resolve.Object{
+ Fields: []*resolve.Field{
+ {
+ Name: []byte("subscriptionWithCompoundSubject"),
+ Value: &resolve.String{
+ Path: []string{"subscriptionWithCompoundSubject"},
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+ datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
+ })
+
+ t.Run("subscription with nested subject", func(t *testing.T) {
+ const operation = "subscription SubscriptionWithNestedSubject { subscriptionWithNestedSubject(input: { firstId: \"11\" }) }"
+ const operationName = `SubscriptionWithNestedSubject`
+ expect := &plan.SubscriptionResponsePlan{
+ Response: &resolve.GraphQLSubscription{
+ Trigger: resolve.GraphQLSubscriptionTrigger{
+ Input: []byte(`{"providerId":"default","subjects":["nested.$$0$$"]}`),
+ Variables: resolve.Variables{
+ &resolve.ContextVariable{
+ Path: []string{"a", "firstId"},
+ Renderer: resolve.NewPlainVariableRenderer(),
+ },
+ },
+ Source: &NatsSubscriptionSource{
+ pubSub: &testPubsub{},
+ },
+ PostProcessing: resolve.PostProcessingConfiguration{
+ MergePath: []string{"subscriptionWithNestedSubject"},
+ },
+ },
+ Response: &resolve.GraphQLResponse{
+ Data: &resolve.Object{
+ Fields: []*resolve.Field{
+ {
+ Name: []byte("subscriptionWithNestedSubject"),
+ Value: &resolve.String{
+ Path: []string{"subscriptionWithNestedSubject"},
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+ datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
+ })
}
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/plan/configuration_visitor.go b/app/v2/pkg/engine/plan/configuration_visitor.go
index b5e2ed1..56384c3 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/plan/configuration_visitor.go
+++ b/app/v2/pkg/engine/plan/configuration_visitor.go
@@ -3,7 +3,6 @@ package plan
import (
"bytes"
"fmt"
- "regexp"
"slices"
"strings"
@@ -12,6 +11,7 @@ import (
"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
"github.com/wundergraph/graphql-go-tools/v2/pkg/astvisitor"
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/argumenttemplate"
"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
)
@@ -870,13 +870,6 @@ func (c *configurationVisitor) buildSubscriptionFilterCondition(condition Subscr
return filter
}
-var (
- // subscriptionFieldFilterRegex is used to extract the variable name from the subscription filter condition
- // e.g. {{ args.id }} -> id
- // e.g. {{ args.input.id }} -> input.id
- subscriptionFieldFilterRegex = regexp.MustCompile(`{{\s*args((?:\.[a-zA-Z0-9_]+)+)\s*}}`)
-)
-
// ContainsTemplateString checks if the value contains a template string
func ContainsTemplateString(value []byte) bool {
return bytes.Contains(value, []byte("{{"))
@@ -887,68 +880,17 @@ func (c *configurationVisitor) buildSubscriptionFieldFilter(condition *Subscript
filter.FieldPath = condition.FieldPath
filter.Values = make([]resolve.InputTemplate, len(condition.Values))
for i, value := range condition.Values {
- matches := subscriptionFieldFilterRegex.FindAllStringSubmatchIndex(value, -1)
- if len(matches) == 0 {
- filter.Values[i].Segments = []resolve.TemplateSegment{
- {
- SegmentType: resolve.StaticSegmentType,
- Data: []byte(value),
- },
- }
- continue
+ inputTemplate, err := argumenttemplate.Parse(value, c.fieldRef, c.walker.Ancestors[0].Ref, c.operation, c.definition)
+ if err != nil {
+ c.walker.StopWithInternalErr(err)
+ return nil
}
- if len(matches) == 1 && len(matches[0]) == 4 {
- prefix := value[:matches[0][0]]
- hasPrefix := len(prefix) > 0
- // the path begins with ".", so ignore the first empty string element with trailing [1:]
- argumentPath := strings.Split(value[matches[0][2]:matches[0][3]][1:], ".")
- argumentName := argumentPath[0]
- argumentRef, ok := c.operation.FieldArgument(c.fieldRef, []byte(argumentName))
- if !ok {
- c.walker.StopWithInternalErr(fmt.Errorf(`field argument "%s" is not defined`, argumentName))
- return nil
- }
- argumentValue := c.operation.ArgumentValue(argumentRef)
- if argumentValue.Kind != ast.ValueKindVariable {
- c.walker.StopWithInternalErr(fmt.Errorf(`expected argument "%s" kind to be "ValueKindVariable" but received "%s"`, argumentName, argumentValue.Kind))
- return nil
- }
- variableName := c.operation.VariableValueNameString(argumentValue.Ref)
- // the variable path should be the variable name, e.g., "a", and then the 2nd element from the path onwards
- variablePath := append([]string{variableName}, argumentPath[1:]...)
- suffix := value[matches[0][1]:]
- hasSuffix := len(suffix) > 0
- size := 1
- if hasPrefix {
- size++
- }
- if hasSuffix {
- size++
+ for j := range inputTemplate.Segments {
+ if inputTemplate.Segments[j].SegmentType == resolve.VariableSegmentType {
+ inputTemplate.Segments[j].Renderer = resolve.NewPlainVariableRenderer()
}
- filter.Values[i].Segments = make([]resolve.TemplateSegment, size)
- idx := 0
- if hasPrefix {
- filter.Values[i].Segments[idx] = resolve.TemplateSegment{
- SegmentType: resolve.StaticSegmentType,
- Data: []byte(prefix),
- }
- idx++
- }
- filter.Values[i].Segments[idx] = resolve.TemplateSegment{
- SegmentType: resolve.VariableSegmentType,
- VariableKind: resolve.ContextVariableKind,
- Renderer: resolve.NewPlainVariableRenderer(),
- VariableSourcePath: variablePath,
- }
- if hasSuffix {
- filter.Values[i].Segments[idx+1] = resolve.TemplateSegment{
- SegmentType: resolve.StaticSegmentType,
- Data: []byte(suffix),
- }
- }
- continue
}
- return nil
+ filter.Values[i] = inputTemplate
}
return filter
}
diff --git a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/plan/subscription_filter_test.go b/app/v2/pkg/engine/plan/subscription_filter_test.go
index d6c51d9..d6eed77 100644
--- a/tmp/agent-patch-flux-pr-817.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/plan/subscription_filter_test.go
+++ b/app/v2/pkg/engine/plan/subscription_filter_test.go
@@ -262,22 +262,49 @@ func TestPlanSubscriptionFilter(t *testing.T) {
},
))
- t.Run("subscription with IN field invalid filter multiple templates", test(
+ t.Run("subscription with IN field filter multiple templates", test(
schema, `
- subscription { heroByID(id: "1") { id name } }
+ subscription { heroByIDMultipleArgs(one: 1, two: 2.5) { id name } }
`, "",
&SubscriptionResponsePlan{
Response: &resolve.GraphQLSubscription{
Trigger: resolve.GraphQLSubscriptionTrigger{
Input: []byte{},
},
+ Filter: &resolve.SubscriptionFilter{
+ In: &resolve.SubscriptionFieldFilter{
+ FieldPath: []string{"id"},
+ Values: []resolve.InputTemplate{
+ {
+ Segments: []resolve.TemplateSegment{
+ {
+ SegmentType: resolve.VariableSegmentType,
+ VariableKind: resolve.ContextVariableKind,
+ VariableSourcePath: []string{"a"},
+ Renderer: resolve.NewPlainVariableRenderer(),
+ },
+ {
+ SegmentType: resolve.StaticSegmentType,
+ Data: []byte("."),
+ },
+ {
+ SegmentType: resolve.VariableSegmentType,
+ VariableKind: resolve.ContextVariableKind,
+ VariableSourcePath: []string{"b"},
+ Renderer: resolve.NewPlainVariableRenderer(),
+ },
+ },
+ },
+ },
+ },
+ },
Response: &resolve.GraphQLResponse{
Data: &resolve.Object{
Fields: []*resolve.Field{
{
- Name: []byte("heroByID"),
+ Name: []byte("heroByIDMultipleArgs"),
Value: &resolve.Object{
- Path: []string{"heroByID"},
+ Path: []string{"heroByIDMultipleArgs"},
Nullable: true,
Fields: []*resolve.Field{
{
@@ -306,19 +333,24 @@ func TestPlanSubscriptionFilter(t *testing.T) {
Fields: []FieldConfiguration{
{
TypeName: "Subscription",
- FieldName: "heroByID",
- Path: []string{"heroByID"},
+ FieldName: "heroByIDMultipleArgs",
+ Path: []string{"heroByIDMultipleArgs"},
Arguments: []ArgumentConfiguration{
{
- Name: "id",
+ Name: "one",
SourceType: FieldArgumentSource,
- SourcePath: []string{"id"},
+ SourcePath: []string{"one"},
+ },
+ {
+ Name: "two",
+ SourceType: FieldArgumentSource,
+ SourcePath: []string{"two"},
},
},
SubscriptionFilterCondition: &SubscriptionFilterCondition{
In: &SubscriptionFieldCondition{
FieldPath: []string{"id"},
- Values: []string{"{{ args.a }}.{{ args.b }}"},
+ Values: []string{"{{ args.one }}.{{ args.two }}"},
},
},
},