agent_patch
runs/2026-02-28__13-38-10__gpt-5-4/flux-pr-1309/flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/agent-logs/agent.patch
74322 bytes
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/graphql_datasource/configuration.go b/app/v2/pkg/engine/datasource/graphql_datasource/configuration.go
index c5a7f3f..ef4fc26 100644
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/graphql_datasource/configuration.go
+++ b/app/v2/pkg/engine/datasource/graphql_datasource/configuration.go
@@ -9,6 +9,7 @@ import (
"github.com/wundergraph/graphql-go-tools/v2/pkg/astparser"
"github.com/wundergraph/graphql-go-tools/v2/pkg/asttransform"
grpcdatasource "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/grpc_datasource"
+ "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
"github.com/wundergraph/graphql-go-tools/v2/pkg/federation"
"github.com/wundergraph/graphql-go-tools/v2/pkg/operationreport"
)
@@ -103,6 +104,8 @@ type SingleTypeField struct {
FieldName string
}
+type SubscriptionOnStartFn func(ctx resolve.StartupHookContext, input []byte) (err error)
+
type SubscriptionConfiguration struct {
URL string
Header http.Header
@@ -119,6 +122,7 @@ type SubscriptionConfiguration struct {
// these headers by itself.
ForwardedClientHeaderRegularExpressions []RegularExpression
WsSubProtocol string
+ StartupHooks []SubscriptionOnStartFn
}
type FetchConfiguration struct {
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go b/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
index 11c61d9..97eaa61 100644
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
+++ b/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource.go
@@ -449,7 +449,8 @@ func (p *Planner[T]) ConfigureSubscription() plan.SubscriptionConfiguration {
return plan.SubscriptionConfiguration{
Input: string(input),
DataSource: &SubscriptionSource{
- client: p.subscriptionClient,
+ client: p.subscriptionClient,
+ subscriptionOnStartFns: p.config.subscription.StartupHooks,
},
Variables: p.variables,
PostProcessing: DefaultPostProcessingConfiguration,
@@ -1953,7 +1954,8 @@ type RegularExpression struct {
}
type SubscriptionSource struct {
- client GraphQLSubscriptionClient
+ client GraphQLSubscriptionClient
+ subscriptionOnStartFns []SubscriptionOnStartFn
}
func (s *SubscriptionSource) AsyncStart(ctx *resolve.Context, id uint64, input []byte, updater resolve.SubscriptionUpdater) error {
@@ -2003,3 +2005,13 @@ func (s *SubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte,
}
return s.client.UniqueRequestID(ctx, options, xxh)
}
+
+func (s *SubscriptionSource) SubscriptionOnStart(ctx resolve.StartupHookContext, input []byte) error {
+ for _, fn := range s.subscriptionOnStartFns {
+ err := fn(ctx, input)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go b/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go
index 7f41943..118af09 100644
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go
+++ b/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go
@@ -4006,7 +4006,7 @@ func TestGraphQLDataSource(t *testing.T) {
Trigger: resolve.GraphQLSubscriptionTrigger{
Input: []byte(`{"url":"wss://swapi.com/graphql","body":{"query":"subscription{remainingJedis}"}}`),
Source: &SubscriptionSource{
- NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
+ client: NewGraphQLSubscriptionClient(http.DefaultClient, http.DefaultClient, ctx),
},
PostProcessing: DefaultPostProcessingConfiguration,
},
@@ -8272,6 +8272,9 @@ func (t *testSubscriptionUpdaterChan) Update(data []byte) {
t.updates <- string(data)
}
+func (t *testSubscriptionUpdaterChan) UpdateSubscription(id resolve.SubscriptionIdentifier, data []byte) {
+}
+
func (t *testSubscriptionUpdaterChan) Complete() {
close(t.complete)
}
@@ -8280,6 +8283,13 @@ func (t *testSubscriptionUpdaterChan) Close(kind resolve.SubscriptionCloseKind)
t.closed <- kind
}
+func (t *testSubscriptionUpdaterChan) CloseSubscription(kind resolve.SubscriptionCloseKind, id resolve.SubscriptionIdentifier) {
+}
+
+func (t *testSubscriptionUpdaterChan) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier {
+ return map[context.Context]resolve.SubscriptionIdentifier{}
+}
+
func (t *testSubscriptionUpdaterChan) AwaitUpdateWithT(tt *testing.T, timeout time.Duration, f func(t *testing.T, update string), msgAndArgs ...any) {
tt.Helper()
@@ -8397,6 +8407,16 @@ func (t *testSubscriptionUpdater) Close(kind resolve.SubscriptionCloseKind) {
t.closed = true
}
+func (t *testSubscriptionUpdater) CloseSubscription(kind resolve.SubscriptionCloseKind, id resolve.SubscriptionIdentifier) {
+}
+
+func (t *testSubscriptionUpdater) Subscriptions() map[context.Context]resolve.SubscriptionIdentifier {
+ return map[context.Context]resolve.SubscriptionIdentifier{}
+}
+
+func (t *testSubscriptionUpdater) UpdateSubscription(id resolve.SubscriptionIdentifier, data []byte) {
+}
+
func TestSubscriptionSource_Start(t *testing.T) {
chatServer := httptest.NewServer(subscriptiontesting.ChatGraphQLEndpointHandler())
defer chatServer.Close()
@@ -8970,6 +8990,53 @@ func TestSanitizeKey(t *testing.T) {
}
}
+func TestSubscriptionSource_SubscriptionOnStart(t *testing.T) {
+ t.Run("calls startup hooks", func(t *testing.T) {
+ ctx := resolve.StartupHookContext{
+ Context: context.Background(),
+ Updater: func(data []byte) {},
+ }
+ called := make(chan []byte, 1)
+ source := SubscriptionSource{
+ subscriptionOnStartFns: []SubscriptionOnStartFn{
+ func(ctx resolve.StartupHookContext, input []byte) error {
+ called <- input
+ return nil
+ },
+ },
+ }
+
+ input := []byte(`{"variables": {}, "extensions": {}, "operationName": "LiveMessages", "query": "subscription LiveMessages { messageAdded(roomName: \"#test\") { text createdBy } }"}`)
+ err := source.SubscriptionOnStart(ctx, input)
+ require.NoError(t, err)
+
+ select {
+ case got := <-called:
+ assert.Equal(t, input, got)
+ case <-time.After(time.Second):
+ t.Fatal("SubscriptionOnStartFn was not called")
+ }
+ })
+
+ t.Run("returns first startup hook error", func(t *testing.T) {
+ ctx := resolve.StartupHookContext{
+ Context: context.Background(),
+ Updater: func(data []byte) {},
+ }
+ source := SubscriptionSource{
+ subscriptionOnStartFns: []SubscriptionOnStartFn{
+ func(ctx resolve.StartupHookContext, input []byte) error {
+ return errors.New("test error")
+ },
+ },
+ }
+
+ err := source.SubscriptionOnStart(ctx, []byte(`{"query":"subscription { messageAdded }"}`))
+ require.Error(t, err)
+ assert.ErrorContains(t, err, "test error")
+ })
+}
+
const interfaceSelectionSchema = `
scalar String
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/kafka_event_manager.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/kafka_event_manager.go
deleted file mode 100644
index 80b02e1..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/kafka_event_manager.go
+++ /dev/null
@@ -1,72 +0,0 @@
-package pubsub_datasource
-
-import (
- "encoding/json"
- "fmt"
- "slices"
-
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
-)
-
-type KafkaSubscriptionEventConfiguration struct {
- ProviderID string `json:"providerId"`
- Topics []string `json:"topics"`
-}
-
-type KafkaPublishEventConfiguration struct {
- ProviderID string `json:"providerId"`
- Topic string `json:"topic"`
- Data json.RawMessage `json:"data"`
-}
-
-func (s *KafkaPublishEventConfiguration) MarshalJSONTemplate() string {
- return fmt.Sprintf(`{"topic":"%s", "data": %s, "providerId":"%s"}`, s.Topic, s.Data, s.ProviderID)
-}
-
-type KafkaEventManager struct {
- visitor *plan.Visitor
- variables *resolve.Variables
- eventMetadata EventMetadata
- eventConfiguration *KafkaEventConfiguration
- publishEventConfiguration *KafkaPublishEventConfiguration
- subscriptionEventConfiguration *KafkaSubscriptionEventConfiguration
-}
-
-func (p *KafkaEventManager) eventDataBytes(ref int) ([]byte, error) {
- return buildEventDataBytes(ref, p.visitor, p.variables)
-}
-
-func (p *KafkaEventManager) handlePublishEvent(ref int) {
- if len(p.eventConfiguration.Topics) != 1 {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("publish and request events should define one subject but received %d", len(p.eventConfiguration.Topics)))
- return
- }
- topic := p.eventConfiguration.Topics[0]
- dataBytes, err := p.eventDataBytes(ref)
- if err != nil {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to write event data bytes: %w", err))
- return
- }
-
- p.publishEventConfiguration = &KafkaPublishEventConfiguration{
- ProviderID: p.eventMetadata.ProviderID,
- Topic: topic,
- Data: dataBytes,
- }
-}
-
-func (p *KafkaEventManager) handleSubscriptionEvent(ref int) {
-
- if len(p.eventConfiguration.Topics) == 0 {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("expected at least one subscription topic but received %d", len(p.eventConfiguration.Topics)))
- return
- }
-
- slices.Sort(p.eventConfiguration.Topics)
-
- p.subscriptionEventConfiguration = &KafkaSubscriptionEventConfiguration{
- ProviderID: p.eventMetadata.ProviderID,
- Topics: p.eventConfiguration.Topics,
- }
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
deleted file mode 100644
index 04fc7e7..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
+++ /dev/null
@@ -1,189 +0,0 @@
-package pubsub_datasource
-
-import (
- "encoding/json"
- "fmt"
- "regexp"
- "slices"
- "strings"
-
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/argument_templates"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
-)
-
-const (
- fwc = '>'
- tsep = "."
-)
-
-// A variable template has form $$number$$ where the number can range from one to multiple digits
-var (
- variableTemplateRegex = regexp.MustCompile(`\$\$\d+\$\$`)
-)
-
-type NatsSubscriptionEventConfiguration struct {
- ProviderID string `json:"providerId"`
- Subjects []string `json:"subjects"`
- StreamConfiguration *NatsStreamConfiguration `json:"streamConfiguration,omitempty"`
-}
-
-type NatsPublishAndRequestEventConfiguration struct {
- ProviderID string `json:"providerId"`
- Subject string `json:"subject"`
- Data json.RawMessage `json:"data"`
-}
-
-func (s *NatsPublishAndRequestEventConfiguration) MarshalJSONTemplate() string {
- return fmt.Sprintf(`{"subject":"%s", "data": %s, "providerId":"%s"}`, s.Subject, s.Data, s.ProviderID)
-}
-
-type NatsEventManager struct {
- visitor *plan.Visitor
- variables *resolve.Variables
- eventMetadata EventMetadata
- eventConfiguration *NatsEventConfiguration
- publishAndRequestEventConfiguration *NatsPublishAndRequestEventConfiguration
- subscriptionEventConfiguration *NatsSubscriptionEventConfiguration
-}
-
-func isValidNatsSubject(subject string) bool {
- if subject == "" {
- return false
- }
- sfwc := false
- tokens := strings.Split(subject, tsep)
- for _, t := range tokens {
- length := len(t)
- if length == 0 || sfwc {
- return false
- }
- if length > 1 {
- if strings.ContainsAny(t, "\t\n\f\r ") {
- return false
- }
- continue
- }
- switch t[0] {
- case fwc:
- sfwc = true
- case ' ', '\t', '\n', '\r', '\f':
- return false
- }
- }
- return true
-}
-
-func (p *NatsEventManager) addContextVariableByArgumentRef(argumentRef int, argumentPath []string) (string, error) {
- variablePath, err := p.visitor.Operation.VariablePathByArgumentRefAndArgumentPath(argumentRef, argumentPath, p.visitor.Walker.Ancestors[0].Ref)
- if err != nil {
- return "", err
- }
- /* The definition is passed as both definition and operation below because getJSONRootType resolves the type
- * from the first argument, but finalInputValueTypeRef comes from the definition
- */
- contextVariable := &resolve.ContextVariable{
- Path: variablePath,
- Renderer: resolve.NewPlainVariableRenderer(),
- }
- variablePlaceHolder, _ := p.variables.AddVariable(contextVariable)
- return variablePlaceHolder, nil
-}
-
-func (p *NatsEventManager) extractEventSubject(fieldRef int, subject string) (string, error) {
- matches := argument_templates.ArgumentTemplateRegex.FindAllStringSubmatch(subject, -1)
- // If no argument templates are defined, there are only static values
- if len(matches) < 1 {
- if isValidNatsSubject(subject) {
- return subject, nil
- }
- return "", fmt.Errorf(`subject "%s" is not a valid NATS subject`, subject)
- }
- fieldNameBytes := p.visitor.Operation.FieldNameBytes(fieldRef)
- // TODO: handling for interfaces and unions
- fieldDefinitionRef, ok := p.visitor.Definition.ObjectTypeDefinitionFieldWithName(p.visitor.Walker.EnclosingTypeDefinition.Ref, fieldNameBytes)
- if !ok {
- return "", fmt.Errorf(`expected field definition to exist for field "%s"`, fieldNameBytes)
- }
- subjectWithVariableTemplateReplacements := subject
- for templateNumber, groups := range matches {
- // The first group is the whole template; the second is the period delimited argument path
- if len(groups) != 2 {
- return "", fmt.Errorf(`argument template #%d defined on field "%s" is invalid: expected 2 matching groups but received %d`, templateNumber+1, fieldNameBytes, len(groups)-1)
- }
- validationResult, err := argument_templates.ValidateArgumentPath(p.visitor.Definition, groups[1], fieldDefinitionRef)
- if err != nil {
- return "", fmt.Errorf(`argument template #%d defined on field "%s" is invalid: %w`, templateNumber+1, fieldNameBytes, err)
- }
- argumentNameBytes := []byte(validationResult.ArgumentPath[0])
- argumentRef, ok := p.visitor.Operation.FieldArgument(fieldRef, argumentNameBytes)
- if !ok {
- return "", fmt.Errorf(`operation field "%s" does not define argument "%s"`, fieldNameBytes, argumentNameBytes)
- }
- // variablePlaceholder has the form $$0$$, $$1$$, etc.
- variablePlaceholder, err := p.addContextVariableByArgumentRef(argumentRef, validationResult.ArgumentPath)
- if err != nil {
- return "", fmt.Errorf(`failed to retrieve variable placeholder for argument ""%s" defined on operation field "%s": %w`, argumentNameBytes, fieldNameBytes, err)
- }
- // Replace the template literal with the variable placeholder (and reuse the variable if it already exists)
- subjectWithVariableTemplateReplacements = strings.ReplaceAll(subjectWithVariableTemplateReplacements, groups[0], variablePlaceholder)
- }
- // Substitute the variable templates for dummy values to check naïvely that the string is a valid NATS subject
- if isValidNatsSubject(variableTemplateRegex.ReplaceAllLiteralString(subjectWithVariableTemplateReplacements, "a")) {
- return subjectWithVariableTemplateReplacements, nil
- }
- return "", fmt.Errorf(`subject "%s" is not a valid NATS subject`, subject)
-}
-
-func (p *NatsEventManager) eventDataBytes(ref int) ([]byte, error) {
- return buildEventDataBytes(ref, p.visitor, p.variables)
-}
-
-func (p *NatsEventManager) handlePublishAndRequestEvent(ref int) {
- if len(p.eventConfiguration.Subjects) != 1 {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("publish and request events should define one subject but received %d", len(p.eventConfiguration.Subjects)))
- return
- }
- rawSubject := p.eventConfiguration.Subjects[0]
- extractedSubject, err := p.extractEventSubject(ref, rawSubject)
- if err != nil {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("could not extract event subject: %w", err))
- return
- }
- dataBytes, err := p.eventDataBytes(ref)
- if err != nil {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to write event data bytes: %w", err))
- return
- }
-
- p.publishAndRequestEventConfiguration = &NatsPublishAndRequestEventConfiguration{
- ProviderID: p.eventMetadata.ProviderID,
- Subject: extractedSubject,
- Data: dataBytes,
- }
-}
-
-func (p *NatsEventManager) handleSubscriptionEvent(ref int) {
-
- if len(p.eventConfiguration.Subjects) == 0 {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("expected at least one subscription subject but received %d", len(p.eventConfiguration.Subjects)))
- return
- }
- extractedSubjects := make([]string, 0, len(p.eventConfiguration.Subjects))
- for _, rawSubject := range p.eventConfiguration.Subjects {
- extractedSubject, err := p.extractEventSubject(ref, rawSubject)
- if err != nil {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("could not extract subscription event subjects: %w", err))
- return
- }
- extractedSubjects = append(extractedSubjects, extractedSubject)
- }
-
- slices.Sort(extractedSubjects)
-
- p.subscriptionEventConfiguration = &NatsSubscriptionEventConfiguration{
- ProviderID: p.eventMetadata.ProviderID,
- Subjects: extractedSubjects,
- StreamConfiguration: p.eventConfiguration.StreamConfiguration,
- }
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
deleted file mode 100644
index bdf1230..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
+++ /dev/null
@@ -1,346 +0,0 @@
-package pubsub_datasource
-
-import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "regexp"
- "strings"
-
- "github.com/jensneuse/abstractlogger"
-
- "github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
-)
-
-type EventType string
-
-const (
- EventTypePublish EventType = "publish"
- EventTypeRequest EventType = "request"
- EventTypeSubscribe EventType = "subscribe"
-)
-
-var eventSubjectRegex = regexp.MustCompile(`{{ args.([a-zA-Z0-9_]+) }}`)
-
-func EventTypeFromString(s string) (EventType, error) {
- et := EventType(strings.ToLower(s))
- switch et {
- case EventTypePublish, EventTypeRequest, EventTypeSubscribe:
- return et, nil
- default:
- return "", fmt.Errorf("invalid event type: %q", s)
- }
-}
-
-type EventMetadata struct {
- ProviderID string `json:"providerId"`
- Type EventType `json:"type"`
- TypeName string `json:"typeName"`
- FieldName string `json:"fieldName"`
-}
-
-type EventConfiguration struct {
- Metadata *EventMetadata `json:"metadata"`
- Configuration any `json:"configuration"`
-}
-
-type Configuration struct {
- Events []EventConfiguration `json:"events"`
-}
-
-type Planner[T Configuration] struct {
- id int
- config Configuration
- natsPubSubByProviderID map[string]NatsPubSub
- kafkaPubSubByProviderID map[string]KafkaPubSub
- eventManager any
- rootFieldRef int
- variables resolve.Variables
- visitor *plan.Visitor
-}
-
-func (p *Planner[T]) SetID(id int) {
- p.id = id
-}
-
-func (p *Planner[T]) ID() (id int) {
- return p.id
-}
-
-func (p *Planner[T]) EnterField(ref int) {
- if p.rootFieldRef != -1 {
- // This is a nested field; nothing needs to be done
- return
- }
- p.rootFieldRef = ref
-
- fieldName := p.visitor.Operation.FieldNameString(ref)
- typeName := p.visitor.Walker.EnclosingTypeDefinition.NameString(p.visitor.Definition)
-
- var eventConfig *EventConfiguration
- for _, cfg := range p.config.Events {
- if cfg.Metadata.TypeName == typeName && cfg.Metadata.FieldName == fieldName {
- eventConfig = &cfg
- break
- }
- }
- if eventConfig == nil {
- return
- }
-
- switch v := eventConfig.Configuration.(type) {
- case *NatsEventConfiguration:
- em := &NatsEventManager{
- visitor: p.visitor,
- variables: &p.variables,
- eventMetadata: *eventConfig.Metadata,
- eventConfiguration: v,
- }
- p.eventManager = em
-
- switch eventConfig.Metadata.Type {
- case EventTypePublish, EventTypeRequest:
- em.handlePublishAndRequestEvent(ref)
- case EventTypeSubscribe:
- em.handleSubscriptionEvent(ref)
- default:
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("invalid EventType \"%s\" for Nats", eventConfig.Metadata.Type))
- }
- case *KafkaEventConfiguration:
- em := &KafkaEventManager{
- visitor: p.visitor,
- variables: &p.variables,
- eventMetadata: *eventConfig.Metadata,
- eventConfiguration: v,
- }
- p.eventManager = em
-
- switch eventConfig.Metadata.Type {
- case EventTypePublish:
- em.handlePublishEvent(ref)
- case EventTypeSubscribe:
- em.handleSubscriptionEvent(ref)
- default:
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("invalid EventType \"%s\" for Kafka", eventConfig.Metadata.Type))
- }
- default:
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("invalid event configuration type: %T", v))
- }
-}
-
-func (p *Planner[T]) EnterDocument(_, _ *ast.Document) {
- p.rootFieldRef = -1
- p.eventManager = nil
-}
-
-func (p *Planner[T]) Register(visitor *plan.Visitor, configuration plan.DataSourceConfiguration[T], dataSourcePlannerConfiguration plan.DataSourcePlannerConfiguration) error {
- p.visitor = visitor
- visitor.Walker.RegisterEnterFieldVisitor(p)
- visitor.Walker.RegisterEnterDocumentVisitor(p)
- p.config = Configuration(configuration.CustomConfiguration())
- return nil
-}
-
-func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
- if p.eventManager == nil {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure fetch: event manager is nil"))
- return resolve.FetchConfiguration{}
- }
-
- var dataSource resolve.DataSource
-
- switch v := p.eventManager.(type) {
- case *NatsEventManager:
- pubsub, ok := p.natsPubSubByProviderID[v.eventMetadata.ProviderID]
- if !ok {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("no pubsub connection exists with provider id \"%s\"", v.eventMetadata.ProviderID))
- return resolve.FetchConfiguration{}
- }
-
- switch v.eventMetadata.Type {
- case EventTypePublish:
- dataSource = &NatsPublishDataSource{
- pubSub: pubsub,
- }
- case EventTypeRequest:
- dataSource = &NatsRequestDataSource{
- pubSub: pubsub,
- }
- default:
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure fetch: invalid event type \"%s\" for Nats", v.eventMetadata.Type))
- return resolve.FetchConfiguration{}
- }
-
- return resolve.FetchConfiguration{
- Input: v.publishAndRequestEventConfiguration.MarshalJSONTemplate(),
- Variables: p.variables,
- DataSource: dataSource,
- PostProcessing: resolve.PostProcessingConfiguration{
- MergePath: []string{v.eventMetadata.FieldName},
- },
- }
-
- case *KafkaEventManager:
- pubsub, ok := p.kafkaPubSubByProviderID[v.eventMetadata.ProviderID]
- if !ok {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("no pubsub connection exists with provider id \"%s\"", v.eventMetadata.ProviderID))
- return resolve.FetchConfiguration{}
- }
-
- switch v.eventMetadata.Type {
- case EventTypePublish:
- dataSource = &KafkaPublishDataSource{
- pubSub: pubsub,
- }
- case EventTypeRequest:
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("event type \"%s\" is not supported for Kafka", v.eventMetadata.Type))
- return resolve.FetchConfiguration{}
- default:
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure fetch: invalid event type \"%s\" for Kafka", v.eventMetadata.Type))
- return resolve.FetchConfiguration{}
- }
-
- return resolve.FetchConfiguration{
- Input: v.publishEventConfiguration.MarshalJSONTemplate(),
- Variables: p.variables,
- DataSource: dataSource,
- PostProcessing: resolve.PostProcessingConfiguration{
- MergePath: []string{v.eventMetadata.FieldName},
- },
- }
-
- default:
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure fetch: invalid event manager type: %T", p.eventManager))
- }
-
- return resolve.FetchConfiguration{}
-}
-
-func (p *Planner[T]) ConfigureSubscription() plan.SubscriptionConfiguration {
- if p.eventManager == nil {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure subscription: event manager is nil"))
- return plan.SubscriptionConfiguration{}
- }
-
- switch v := p.eventManager.(type) {
- case *NatsEventManager:
- pubsub, ok := p.natsPubSubByProviderID[v.eventMetadata.ProviderID]
- if !ok {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("no pubsub connection exists with provider id \"%s\"", v.eventMetadata.ProviderID))
- return plan.SubscriptionConfiguration{}
- }
- object, err := json.Marshal(v.subscriptionEventConfiguration)
- if err != nil {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to marshal event subscription streamConfiguration"))
- return plan.SubscriptionConfiguration{}
- }
- return plan.SubscriptionConfiguration{
- Input: string(object),
- Variables: p.variables,
- DataSource: &NatsSubscriptionSource{
- pubSub: pubsub,
- },
- PostProcessing: resolve.PostProcessingConfiguration{
- MergePath: []string{v.eventMetadata.FieldName},
- },
- }
- case *KafkaEventManager:
- pubsub, ok := p.kafkaPubSubByProviderID[v.eventMetadata.ProviderID]
- if !ok {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("no pubsub connection exists with provider id \"%s\"", v.eventMetadata.ProviderID))
- return plan.SubscriptionConfiguration{}
- }
- object, err := json.Marshal(v.subscriptionEventConfiguration)
- if err != nil {
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to marshal event subscription streamConfiguration"))
- return plan.SubscriptionConfiguration{}
- }
- return plan.SubscriptionConfiguration{
- Input: string(object),
- Variables: p.variables,
- DataSource: &KafkaSubscriptionSource{
- pubSub: pubsub,
- },
- PostProcessing: resolve.PostProcessingConfiguration{
- MergePath: []string{v.eventMetadata.FieldName},
- },
- }
- default:
- p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure subscription: invalid event manager type: %T", p.eventManager))
- }
-
- return plan.SubscriptionConfiguration{}
-}
-
-func (p *Planner[T]) DownstreamResponseFieldAlias(_ int) (alias string, exists bool) {
- return "", false
-}
-
-func NewFactory[T Configuration](executionContext context.Context, natsPubSubByProviderID map[string]NatsPubSub, kafkaPubSubByProviderID map[string]KafkaPubSub) *Factory[T] {
- return &Factory[T]{
- executionContext: executionContext,
- natsPubSubByProviderID: natsPubSubByProviderID,
- kafkaPubSubByProviderID: kafkaPubSubByProviderID,
- }
-}
-
-type Factory[T Configuration] struct {
- executionContext context.Context
- natsPubSubByProviderID map[string]NatsPubSub
- kafkaPubSubByProviderID map[string]KafkaPubSub
-}
-
-func (f *Factory[T]) Planner(_ abstractlogger.Logger) plan.DataSourcePlanner[T] {
- return &Planner[T]{
- natsPubSubByProviderID: f.natsPubSubByProviderID,
- kafkaPubSubByProviderID: f.kafkaPubSubByProviderID,
- }
-}
-
-func (f *Factory[T]) Context() context.Context {
- return f.executionContext
-}
-
-func (f *Factory[T]) UpstreamSchema(_ plan.DataSourceConfiguration[T]) (*ast.Document, bool) {
- return nil, false
-}
-
-func (f *Factory[T]) PlanningBehavior() plan.DataSourcePlanningBehavior {
- return plan.DataSourcePlanningBehavior{
- MergeAliasedRootNodes: false,
- OverrideFieldPathFromAlias: false,
- AllowPlanningTypeName: true,
- }
-}
-
-func buildEventDataBytes(ref int, visitor *plan.Visitor, variables *resolve.Variables) ([]byte, error) {
- // Collect the field arguments for fetch based operations
- fieldArgs := visitor.Operation.FieldArguments(ref)
- var dataBuffer bytes.Buffer
- dataBuffer.WriteByte('{')
- for i, arg := range fieldArgs {
- if i > 0 {
- dataBuffer.WriteByte(',')
- }
- argValue := visitor.Operation.ArgumentValue(arg)
- variableName := visitor.Operation.VariableValueNameBytes(argValue.Ref)
- contextVariable := &resolve.ContextVariable{
- Path: []string{string(variableName)},
- Renderer: resolve.NewJSONVariableRenderer(),
- }
- variablePlaceHolder, _ := variables.AddVariable(contextVariable)
- argumentName := visitor.Operation.ArgumentNameString(arg)
- escapedKey, err := json.Marshal(argumentName)
- if err != nil {
- return nil, err
- }
- dataBuffer.Write(escapedKey)
- dataBuffer.WriteByte(':')
- dataBuffer.WriteString(variablePlaceHolder)
- }
- dataBuffer.WriteByte('}')
- return dataBuffer.Bytes(), nil
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
deleted file mode 100644
index 28a37df..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
+++ /dev/null
@@ -1,669 +0,0 @@
-package pubsub_datasource
-
-import (
- "bytes"
- "context"
- "errors"
- "io"
- "testing"
-
- "github.com/stretchr/testify/require"
-
- "github.com/wundergraph/astjson"
-
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasourcetesting"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/internal/unsafeparser"
-)
-
-type testPubsub struct {
-}
-
-func (t *testPubsub) Subscribe(_ context.Context, _ NatsSubscriptionEventConfiguration, _ resolve.SubscriptionUpdater) error {
- return errors.New("not implemented")
-}
-func (t *testPubsub) Publish(_ context.Context, _ NatsPublishAndRequestEventConfiguration) error {
- return errors.New("not implemented")
-}
-
-func (t *testPubsub) Request(_ context.Context, _ NatsPublishAndRequestEventConfiguration, _ io.Writer) error {
- return errors.New("not implemented")
-}
-
-func TestPubSub(t *testing.T) {
- factory := &Factory[Configuration]{
- natsPubSubByProviderID: map[string]NatsPubSub{"default": &testPubsub{}},
- }
-
- const schema = `
- type Query {
- helloQuery(userKey: UserKey!): User! @edfs__natsRequest(subject: "tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}")
- }
-
- type Mutation {
- helloMutation(userKey: UserKey!): edfs__PublishResult! @edfs__natsPublish(subject: "tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}")
- }
-
- type Subscription {
- helloSubscription(userKey: UserKey!): User! @edfs__natsSubscribe(subjects: ["tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}"])
- subscriptionWithMultipleSubjects(userKeyOne: UserKey!, userKeyTwo: UserKey!): User! @edfs__natsSubscribe(subjects: ["tenantsOne.{{ args.userKeyOne.tenantId }}.users.{{ args.userKeyOne.id }}", "tenantsTwo.{{ args.userKeyTwo.tenantId }}.users.{{ args.userKeyTwo.id }}"])
- subscriptionWithStaticValues: User! @edfs__natsSubscribe(subjects: ["tenants.1.users.1"])
- subscriptionWithArgTemplateAndStaticValue(nestedUserKey: NestedUserKey!): User! @edfs__natsSubscribe(subjects: ["tenants.1.users.{{ args.nestedUserKey.user.id }}"])
- }
-
- type User @key(fields: "id tenant { id }") {
- id: Int! @external
- tenant: Tenant! @external
- }
-
- type Tenant {
- id: Int! @external
- }
-
- input UserKey {
- id: Int!
- tenantId: Int!
- }
-
- input NestedUserKey {
- user: UserInput!
- tenant: TenantInput!
- }
-
- input UserInput {
- id: Int!
- }
-
- input TenantInput {
- id: Int!
- }
-
- type edfs__PublishResult {
- success: Boolean!
- }
-
- input edfs__NatsStreamConfiguration {
- consumerName: String!
- streamName: String!
- }
- `
-
- dataSourceCustomConfig := Configuration{
- Events: []EventConfiguration{
- {
- Metadata: &EventMetadata{
- FieldName: "helloQuery",
- ProviderID: "default",
- Type: EventTypeRequest,
- TypeName: "Query",
- },
- Configuration: &NatsEventConfiguration{
- Subjects: []string{"tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}"},
- },
- },
- {
- Metadata: &EventMetadata{
- FieldName: "helloMutation",
- ProviderID: "default",
- Type: EventTypePublish,
- TypeName: "Mutation",
- },
- Configuration: &NatsEventConfiguration{
- Subjects: []string{"tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}"},
- },
- },
- {
- Metadata: &EventMetadata{
- FieldName: "helloSubscription",
- ProviderID: "default",
- Type: EventTypeSubscribe,
- TypeName: "Subscription",
- },
- Configuration: &NatsEventConfiguration{
- Subjects: []string{"tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}"},
- },
- },
- {
- Metadata: &EventMetadata{
- FieldName: "subscriptionWithMultipleSubjects",
- ProviderID: "default",
- Type: EventTypeSubscribe,
- TypeName: "Subscription",
- },
- Configuration: &NatsEventConfiguration{
- Subjects: []string{"tenantsOne.{{ args.userKeyOne.tenantId }}.users.{{ args.userKeyOne.id }}", "tenantsTwo.{{ args.userKeyTwo.tenantId }}.users.{{ args.userKeyTwo.id }}"},
- },
- },
- {
- Metadata: &EventMetadata{
- FieldName: "subscriptionWithStaticValues",
- ProviderID: "default",
- Type: EventTypeSubscribe,
- TypeName: "Subscription",
- },
- Configuration: &NatsEventConfiguration{
- Subjects: []string{"tenants.1.users.1"},
- },
- },
- {
- Metadata: &EventMetadata{
- FieldName: "subscriptionWithArgTemplateAndStaticValue",
- ProviderID: "default",
- Type: EventTypeSubscribe,
- TypeName: "Subscription",
- },
- Configuration: &NatsEventConfiguration{
- Subjects: []string{"tenants.1.users.{{ args.nestedUserKey.user.id }}"},
- },
- },
- },
- }
-
- dataSourceConfiguration, err := plan.NewDataSourceConfiguration[Configuration](
- "test",
- factory,
- &plan.DataSourceMetadata{
- RootNodes: []plan.TypeField{
- {
- TypeName: "Query",
- FieldNames: []string{"helloQuery"},
- },
- {
- TypeName: "Mutation",
- FieldNames: []string{"helloMutation"},
- },
- {
- TypeName: "Subscription",
- FieldNames: []string{"helloSubscription"},
- },
- {
- TypeName: "Subscription",
- FieldNames: []string{"subscriptionWithMultipleSubjects"},
- },
- {
- TypeName: "Subscription",
- FieldNames: []string{"subscriptionWithStaticValues"},
- },
- {
- TypeName: "Subscription",
- FieldNames: []string{"subscriptionWithArgTemplateAndStaticValue"},
- },
- },
- ChildNodes: []plan.TypeField{
- // Entities are child nodes in pubsub datasources
- {
- TypeName: "User",
- FieldNames: []string{"id", "tenant"},
- },
- {
- TypeName: "Tenant",
- FieldNames: []string{"id"},
- },
- {
- TypeName: "edfs__PublishResult",
- FieldNames: []string{"success"},
- },
- },
- },
- dataSourceCustomConfig,
- )
- require.NoError(t, err)
-
- planConfig := plan.Configuration{
- DataSources: []plan.DataSource{
- dataSourceConfiguration,
- },
- Fields: []plan.FieldConfiguration{
- {
- TypeName: "Query",
- FieldName: "helloQuery",
- Arguments: []plan.ArgumentConfiguration{
- {
- Name: "userKey",
- SourceType: plan.FieldArgumentSource,
- },
- },
- },
- {
- TypeName: "Mutation",
- FieldName: "helloMutation",
- Arguments: []plan.ArgumentConfiguration{
- {
- Name: "userKey",
- SourceType: plan.FieldArgumentSource,
- },
- },
- },
- {
- TypeName: "Subscription",
- FieldName: "helloSubscription",
- Arguments: []plan.ArgumentConfiguration{
- {
- Name: "userKey",
- SourceType: plan.FieldArgumentSource,
- },
- },
- },
- {
- TypeName: "Subscription",
- FieldName: "subscriptionWithMultipleSubjects",
- Arguments: []plan.ArgumentConfiguration{
- {
- Name: "userKeyOne",
- SourceType: plan.FieldArgumentSource,
- },
- {
- Name: "userKeyTwo",
- SourceType: plan.FieldArgumentSource,
- },
- },
- },
- {
- TypeName: "Subscription",
- FieldName: "subscriptionWithArgTemplateAndStaticValue",
- Arguments: []plan.ArgumentConfiguration{
- {
- Name: "nestedUserKey",
- SourceType: plan.FieldArgumentSource,
- },
- },
- },
- },
- DisableResolveFieldPositions: true,
- }
-
- t.Run("query", func(t *testing.T) {
- const operation = "query HelloQuery { helloQuery(userKey:{id:42,tenantId:3}) { id } }"
- const operationName = `HelloQuery`
- expect := &plan.SynchronousResponsePlan{
- Response: &resolve.GraphQLResponse{
- RawFetches: []*resolve.FetchItem{
- {
- Fetch: &resolve.SingleFetch{
- FetchConfiguration: resolve.FetchConfiguration{
- Input: `{"subject":"tenants.$$0$$.users.$$1$$", "data": {"userKey":$$2$$}, "providerId":"default"}`,
- Variables: resolve.Variables{
- &resolve.ContextVariable{
- Path: []string{"a", "tenantId"},
- Renderer: resolve.NewPlainVariableRenderer(),
- },
- &resolve.ContextVariable{
- Path: []string{"a", "id"},
- Renderer: resolve.NewPlainVariableRenderer(),
- },
- &resolve.ContextVariable{
- Path: []string{"a"},
- Renderer: resolve.NewJSONVariableRenderer(),
- },
- },
- DataSource: &NatsRequestDataSource{
- pubSub: &testPubsub{},
- },
- PostProcessing: resolve.PostProcessingConfiguration{
- MergePath: []string{"helloQuery"},
- },
- },
- DataSourceIdentifier: []byte("pubsub_datasource.NatsRequestDataSource"),
- },
- },
- },
- Data: &resolve.Object{
- Fields: []*resolve.Field{
- {
- Name: []byte("helloQuery"),
- Value: &resolve.Object{
- Path: []string{"helloQuery"},
- Nullable: false,
- PossibleTypes: map[string]struct{}{
- "User": {},
- },
- TypeName: "User",
- Fields: []*resolve.Field{
- {
- Name: []byte("id"),
- Value: &resolve.Integer{
- Path: []string{"id"},
- Nullable: false,
- },
- },
- },
- },
- },
- },
- },
- },
- }
- datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
- })
-
- t.Run("mutation", func(t *testing.T) {
- const operation = "mutation HelloMutation { helloMutation(userKey:{id:42,tenantId:3}) { success } }"
- const operationName = `HelloMutation`
- expect := &plan.SynchronousResponsePlan{
- Response: &resolve.GraphQLResponse{
- RawFetches: []*resolve.FetchItem{
- {
- Fetch: &resolve.SingleFetch{
- FetchConfiguration: resolve.FetchConfiguration{
- Input: `{"subject":"tenants.$$0$$.users.$$1$$", "data": {"userKey":$$2$$}, "providerId":"default"}`,
- Variables: resolve.Variables{
- &resolve.ContextVariable{
- Path: []string{"a", "tenantId"},
- Renderer: resolve.NewPlainVariableRenderer(),
- },
- &resolve.ContextVariable{
- Path: []string{"a", "id"},
- Renderer: resolve.NewPlainVariableRenderer(),
- },
- &resolve.ContextVariable{
- Path: []string{"a"},
- Renderer: resolve.NewJSONVariableRenderer(),
- },
- },
- DataSource: &NatsPublishDataSource{
- pubSub: &testPubsub{},
- },
- PostProcessing: resolve.PostProcessingConfiguration{
- MergePath: []string{"helloMutation"},
- },
- },
- DataSourceIdentifier: []byte("pubsub_datasource.NatsPublishDataSource"),
- },
- },
- },
- Data: &resolve.Object{
- Fields: []*resolve.Field{
- {
- Name: []byte("helloMutation"),
- Value: &resolve.Object{
- Path: []string{"helloMutation"},
- Nullable: false,
- PossibleTypes: map[string]struct{}{
- "edfs__PublishResult": {},
- },
- TypeName: "edfs__PublishResult",
- Fields: []*resolve.Field{
- {
- Name: []byte("success"),
- Value: &resolve.Boolean{
- Path: []string{"success"},
- Nullable: false,
- },
- },
- },
- },
- },
- },
- },
- },
- }
- datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
- })
-
- t.Run("subscription", func(t *testing.T) {
- const operation = "subscription HelloSubscription { helloSubscription(userKey:{id:42,tenantId:3}) { id } }"
- const operationName = `HelloSubscription`
- expect := &plan.SubscriptionResponsePlan{
- Response: &resolve.GraphQLSubscription{
- Trigger: resolve.GraphQLSubscriptionTrigger{
- Input: []byte(`{"providerId":"default","subjects":["tenants.$$0$$.users.$$1$$"]}`),
- Variables: resolve.Variables{
- &resolve.ContextVariable{
- Path: []string{"a", "tenantId"},
- Renderer: resolve.NewPlainVariableRenderer(),
- },
- &resolve.ContextVariable{
- Path: []string{"a", "id"},
- Renderer: resolve.NewPlainVariableRenderer(),
- },
- },
- Source: &NatsSubscriptionSource{
- pubSub: &testPubsub{},
- },
- PostProcessing: resolve.PostProcessingConfiguration{
- MergePath: []string{"helloSubscription"},
- },
- },
- Response: &resolve.GraphQLResponse{
- Data: &resolve.Object{
- Fields: []*resolve.Field{
- {
- Name: []byte("helloSubscription"),
- Value: &resolve.Object{
- Path: []string{"helloSubscription"},
- Nullable: false,
- PossibleTypes: map[string]struct{}{
- "User": {},
- },
- TypeName: "User",
- Fields: []*resolve.Field{
- {
- Name: []byte("id"),
- Value: &resolve.Integer{
- Path: []string{"id"},
- Nullable: false,
- },
- },
- },
- },
- },
- },
- },
- },
- },
- }
- datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
- })
-
- t.Run("subscription with multiple subjects", func(t *testing.T) {
- const operation = "subscription SubscriptionWithMultipleSubjects { subscriptionWithMultipleSubjects(userKeyOne:{id:42,tenantId:3},userKeyTwo:{id:24,tenantId:99}) { id } }"
- const operationName = `SubscriptionWithMultipleSubjects`
- expect := &plan.SubscriptionResponsePlan{
- Response: &resolve.GraphQLSubscription{
- Trigger: resolve.GraphQLSubscriptionTrigger{
- Input: []byte(`{"providerId":"default","subjects":["tenantsOne.$$0$$.users.$$1$$","tenantsTwo.$$2$$.users.$$3$$"]}`),
- Variables: resolve.Variables{
- &resolve.ContextVariable{
- Path: []string{"a", "tenantId"},
- Renderer: resolve.NewPlainVariableRenderer(),
- },
- &resolve.ContextVariable{
- Path: []string{"a", "id"},
- Renderer: resolve.NewPlainVariableRenderer(),
- },
- &resolve.ContextVariable{
- Path: []string{"b", "tenantId"},
- Renderer: resolve.NewPlainVariableRenderer(),
- },
- &resolve.ContextVariable{
- Path: []string{"b", "id"},
- Renderer: resolve.NewPlainVariableRenderer(),
- },
- },
- Source: &NatsSubscriptionSource{
- pubSub: &testPubsub{},
- },
- PostProcessing: resolve.PostProcessingConfiguration{
- MergePath: []string{"subscriptionWithMultipleSubjects"},
- },
- },
- Response: &resolve.GraphQLResponse{
- Data: &resolve.Object{
- Fields: []*resolve.Field{
- {
- Name: []byte("subscriptionWithMultipleSubjects"),
- Value: &resolve.Object{
- Path: []string{"subscriptionWithMultipleSubjects"},
- Nullable: false,
- PossibleTypes: map[string]struct{}{
- "User": {},
- },
- TypeName: "User",
- Fields: []*resolve.Field{
- {
- Name: []byte("id"),
- Value: &resolve.Integer{
- Path: []string{"id"},
- Nullable: false,
- },
- },
- },
- },
- },
- },
- },
- },
- },
- }
- datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
- })
-
- t.Run("subscription with only static values", func(t *testing.T) {
- const operation = "subscription SubscriptionWithStaticValues { subscriptionWithStaticValues { id } }"
- const operationName = `SubscriptionWithStaticValues`
- expect := &plan.SubscriptionResponsePlan{
- Response: &resolve.GraphQLSubscription{
- Trigger: resolve.GraphQLSubscriptionTrigger{
- Input: []byte(`{"providerId":"default","subjects":["tenants.1.users.1"]}`),
- Source: &NatsSubscriptionSource{
- pubSub: &testPubsub{},
- },
- PostProcessing: resolve.PostProcessingConfiguration{
- MergePath: []string{"subscriptionWithStaticValues"},
- },
- },
- Response: &resolve.GraphQLResponse{
- Data: &resolve.Object{
- Fields: []*resolve.Field{
- {
- Name: []byte("subscriptionWithStaticValues"),
- Value: &resolve.Object{
- Path: []string{"subscriptionWithStaticValues"},
- Nullable: false,
- PossibleTypes: map[string]struct{}{
- "User": {},
- },
- TypeName: "User",
- Fields: []*resolve.Field{
- {
- Name: []byte("id"),
- Value: &resolve.Integer{
- Path: []string{"id"},
- Nullable: false,
- },
- },
- },
- },
- },
- },
- },
- },
- },
- }
- datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
- })
-
- t.Run("subscription with deeply nested argument and static value", func(t *testing.T) {
- const operation = "subscription SubscriptionWithArgTemplateAndStaticValue { subscriptionWithArgTemplateAndStaticValue(nestedUserKey: { user: { id: 44, tenantId: 2 } }) { id } }"
- const operationName = `SubscriptionWithArgTemplateAndStaticValue`
- expect := &plan.SubscriptionResponsePlan{
- Response: &resolve.GraphQLSubscription{
- Trigger: resolve.GraphQLSubscriptionTrigger{
- Input: []byte(`{"providerId":"default","subjects":["tenants.1.users.$$0$$"]}`),
- Variables: resolve.Variables{
- &resolve.ContextVariable{
- Path: []string{"a", "user", "id"},
- Renderer: resolve.NewPlainVariableRenderer(),
- },
- },
- Source: &NatsSubscriptionSource{
- pubSub: &testPubsub{},
- },
- PostProcessing: resolve.PostProcessingConfiguration{
- MergePath: []string{"subscriptionWithArgTemplateAndStaticValue"},
- },
- },
- Response: &resolve.GraphQLResponse{
- Data: &resolve.Object{
- Fields: []*resolve.Field{
- {
- Name: []byte("subscriptionWithArgTemplateAndStaticValue"),
- Value: &resolve.Object{
- Path: []string{"subscriptionWithArgTemplateAndStaticValue"},
- Nullable: false,
- PossibleTypes: map[string]struct{}{
- "User": {},
- },
- TypeName: "User",
- Fields: []*resolve.Field{
- {
- Name: []byte("id"),
- Value: &resolve.Integer{
- Path: []string{"id"},
- Nullable: false,
- },
- },
- },
- },
- },
- },
- },
- },
- },
- }
- datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
- })
-}
-
-func TestBuildEventDataBytes(t *testing.T) {
- t.Run("check string serialization", func(t *testing.T) {
- const operation = "mutation HelloMutation($id: ID!) { helloMutation(userKey:{id:$id,tenantId:3}) { success } }"
- op := unsafeparser.ParseGraphqlDocumentString(operation)
- var vars resolve.Variables
- visitor := plan.Visitor{
- Operation: &op,
- }
- _, err := buildEventDataBytes(1, &visitor, &vars)
- require.NoError(t, err)
- require.Len(t, vars, 1)
-
- template := resolve.InputTemplate{
- Segments: []resolve.TemplateSegment{
- vars[0].TemplateSegment(),
- },
- }
- ctx := &resolve.Context{
- Variables: astjson.MustParseBytes([]byte(`{"id":"asdf"}`)),
- }
- buf := &bytes.Buffer{}
- err = template.Render(ctx, nil, buf)
- require.NoError(t, err)
- require.Equal(t, `"asdf"`, buf.String())
- })
-
- t.Run("check int serialization", func(t *testing.T) {
- const operation = "mutation HelloMutation($id: Int!) { helloMutation(userKey:{id:$id,tenantId:3}) { success } }"
- op := unsafeparser.ParseGraphqlDocumentString(operation)
- var vars resolve.Variables
- visitor := plan.Visitor{
- Operation: &op,
- }
- _, err := buildEventDataBytes(1, &visitor, &vars)
- require.NoError(t, err)
- require.Len(t, vars, 1)
-
- template := resolve.InputTemplate{
- Segments: []resolve.TemplateSegment{
- vars[0].TemplateSegment(),
- },
- }
- ctx := &resolve.Context{
- Variables: astjson.MustParseBytes([]byte(`{"id":5}`)),
- }
- buf := &bytes.Buffer{}
- err = template.Render(ctx, nil, buf)
- require.NoError(t, err)
- require.Equal(t, `5`, buf.String())
- })
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_kafka.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_kafka.go
deleted file mode 100644
index cc562b8..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_kafka.go
+++ /dev/null
@@ -1,88 +0,0 @@
-package pubsub_datasource
-
-import (
- "bytes"
- "context"
- "encoding/json"
- "io"
-
- "github.com/buger/jsonparser"
- "github.com/cespare/xxhash/v2"
-
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
-)
-
-type KafkaEventConfiguration struct {
- Topics []string `json:"topics"`
-}
-
-type KafkaConnector interface {
- New(ctx context.Context) KafkaPubSub
-}
-
-// KafkaPubSub describe the interface that implements the primitive operations for pubsub
-type KafkaPubSub interface {
- // Subscribe starts listening on the given subjects and sends the received messages to the given next channel
- Subscribe(ctx context.Context, config KafkaSubscriptionEventConfiguration, updater resolve.SubscriptionUpdater) error
- // Publish sends the given data to the given subject
- Publish(ctx context.Context, config KafkaPublishEventConfiguration) error
-}
-
-type KafkaSubscriptionSource struct {
- pubSub KafkaPubSub
-}
-
-func (s *KafkaSubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte, xxh *xxhash.Digest) error {
-
- val, _, _, err := jsonparser.Get(input, "topics")
- if err != nil {
- return err
- }
-
- _, err = xxh.Write(val)
- if err != nil {
- return err
- }
-
- val, _, _, err = jsonparser.Get(input, "providerId")
- if err != nil {
- return err
- }
-
- _, err = xxh.Write(val)
- return err
-}
-
-func (s *KafkaSubscriptionSource) Start(ctx *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error {
- var subscriptionConfiguration KafkaSubscriptionEventConfiguration
- err := json.Unmarshal(input, &subscriptionConfiguration)
- if err != nil {
- return err
- }
-
- return s.pubSub.Subscribe(ctx.Context(), subscriptionConfiguration, updater)
-}
-
-type KafkaPublishDataSource struct {
- pubSub KafkaPubSub
-}
-
-func (s *KafkaPublishDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error {
- var publishConfiguration KafkaPublishEventConfiguration
- err := json.Unmarshal(input, &publishConfiguration)
- if err != nil {
- return err
- }
-
- if err := s.pubSub.Publish(ctx, publishConfiguration); err != nil {
- _, err = io.WriteString(out, `{"success": false}`)
- return err
- }
- _, err = io.WriteString(out, `{"success": true}`)
- return err
-}
-
-func (s *KafkaPublishDataSource) LoadWithFiles(ctx context.Context, input []byte, files []*httpclient.FileUpload, out *bytes.Buffer) (err error) {
- panic("not implemented")
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_nats.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_nats.go
deleted file mode 100644
index 31cb6d4..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_nats.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package pubsub_datasource
-
-import (
- "bytes"
- "context"
- "encoding/json"
- "io"
-
- "github.com/buger/jsonparser"
- "github.com/cespare/xxhash/v2"
-
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient"
- "github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
-)
-
-type NatsStreamConfiguration struct {
- Consumer string `json:"consumer"`
- ConsumerInactiveThreshold int32 `json:"consumerInactiveThreshold"`
- StreamName string `json:"streamName"`
-}
-
-type NatsEventConfiguration struct {
- StreamConfiguration *NatsStreamConfiguration `json:"streamConfiguration,omitempty"`
- Subjects []string `json:"subjects"`
-}
-
-type NatsConnector interface {
- New(ctx context.Context) NatsPubSub
-}
-
-// NatsPubSub describe the interface that implements the primitive operations for pubsub
-type NatsPubSub interface {
- // Subscribe starts listening on the given subjects and sends the received messages to the given next channel
- Subscribe(ctx context.Context, event NatsSubscriptionEventConfiguration, updater resolve.SubscriptionUpdater) error
- // Publish sends the given data to the given subject
- Publish(ctx context.Context, event NatsPublishAndRequestEventConfiguration) error
- // Request sends a request on the given subject and writes the response to the given writer
- Request(ctx context.Context, event NatsPublishAndRequestEventConfiguration, w io.Writer) error
-}
-
-type NatsSubscriptionSource struct {
- pubSub NatsPubSub
-}
-
-func (s *NatsSubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte, xxh *xxhash.Digest) error {
-
- val, _, _, err := jsonparser.Get(input, "subjects")
- if err != nil {
- return err
- }
-
- _, err = xxh.Write(val)
- if err != nil {
- return err
- }
-
- val, _, _, err = jsonparser.Get(input, "providerId")
- if err != nil {
- return err
- }
-
- _, err = xxh.Write(val)
- return err
-}
-
-func (s *NatsSubscriptionSource) Start(ctx *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error {
- var subscriptionConfiguration NatsSubscriptionEventConfiguration
- err := json.Unmarshal(input, &subscriptionConfiguration)
- if err != nil {
- return err
- }
-
- return s.pubSub.Subscribe(ctx.Context(), subscriptionConfiguration, updater)
-}
-
-type NatsPublishDataSource struct {
- pubSub NatsPubSub
-}
-
-func (s *NatsPublishDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error {
- var publishConfiguration NatsPublishAndRequestEventConfiguration
- err := json.Unmarshal(input, &publishConfiguration)
- if err != nil {
- return err
- }
-
- if err := s.pubSub.Publish(ctx, publishConfiguration); err != nil {
- _, err = io.WriteString(out, `{"success": false}`)
- return err
- }
-
- _, err = io.WriteString(out, `{"success": true}`)
- return err
-}
-
-func (s *NatsPublishDataSource) LoadWithFiles(ctx context.Context, input []byte, files []*httpclient.FileUpload, out *bytes.Buffer) error {
- panic("not implemented")
-}
-
-type NatsRequestDataSource struct {
- pubSub NatsPubSub
-}
-
-func (s *NatsRequestDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error {
- var subscriptionConfiguration NatsPublishAndRequestEventConfiguration
- err := json.Unmarshal(input, &subscriptionConfiguration)
- if err != nil {
- return err
- }
-
- return s.pubSub.Request(ctx, subscriptionConfiguration, out)
-}
-
-func (s *NatsRequestDataSource) LoadWithFiles(ctx context.Context, input []byte, files []*httpclient.FileUpload, out *bytes.Buffer) error {
- panic("not implemented")
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/resolve/datasource.go b/app/v2/pkg/engine/resolve/datasource.go
index c679d76..ee8fdbb 100644
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/resolve/datasource.go
+++ b/app/v2/pkg/engine/resolve/datasource.go
@@ -26,3 +26,10 @@ type AsyncSubscriptionDataSource interface {
AsyncStop(id uint64)
UniqueRequestID(ctx *Context, input []byte, xxh *xxhash.Digest) (err error)
}
+
+// HookableSubscriptionDataSource can run startup logic for each subscription before the shared trigger starts.
+type HookableSubscriptionDataSource interface {
+ // SubscriptionOnStart is called when a new subscription is created.
+ // Returning an error prevents that subscription from starting.
+ SubscriptionOnStart(ctx StartupHookContext, input []byte) (err error)
+}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/resolve/resolve.go b/app/v2/pkg/engine/resolve/resolve.go
index 14d8ad4..1869dc4 100644
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/resolve/resolve.go
+++ b/app/v2/pkg/engine/resolve/resolve.go
@@ -297,6 +297,15 @@ type trigger struct {
subscriptions map[*Context]*sub
// initialized is set to true when the trigger is started and initialized
initialized bool
+ updater *subscriptionUpdater
+}
+
+func (t *trigger) subscriptionIDs() map[context.Context]SubscriptionIdentifier {
+ subs := make(map[context.Context]SubscriptionIdentifier, len(t.subscriptions))
+ for ctx, sub := range t.subscriptions {
+ subs[ctx.Context()] = sub.id
+ }
+ return subs
}
// workItem is used to encapsulate a function that needs to be
@@ -502,11 +511,13 @@ func (r *Resolver) handleEvent(event subscriptionEvent) {
case subscriptionEventKindAddSubscription:
r.handleAddSubscription(event.triggerID, event.addSubscription)
case subscriptionEventKindRemoveSubscription:
- r.handleRemoveSubscription(event.id)
+ r.handleRemoveSubscription(event.id, event.closeKind)
case subscriptionEventKindCompleteSubscription:
r.handleCompleteSubscription(event.id)
case subscriptionEventKindRemoveClient:
r.handleRemoveClient(event.id.ConnectionID)
+ case subscriptionEventKindUpdateSubscription:
+ r.handleUpdateSubscription(event.triggerID, event.data, event.id)
case subscriptionEventKindTriggerUpdate:
r.handleTriggerUpdate(event.triggerID, event.data)
case subscriptionEventKindTriggerComplete:
@@ -581,6 +592,37 @@ func (r *Resolver) handleTriggerComplete(triggerID uint64) {
r.completeTrigger(triggerID)
}
+type StartupHookContext struct {
+ Context context.Context
+ Updater func(data []byte)
+}
+
+func (r *Resolver) executeStartupHooks(add *addSubscription, updater *subscriptionUpdater) error {
+ hook, ok := add.resolve.Trigger.Source.(HookableSubscriptionDataSource)
+ if !ok {
+ return nil
+ }
+
+ hookCtx := StartupHookContext{
+ Context: add.ctx.Context(),
+ Updater: func(data []byte) {
+ updater.UpdateSubscription(add.id, data)
+ },
+ }
+
+ err := hook.SubscriptionOnStart(hookCtx, add.input)
+ if err != nil {
+ if r.options.Debug {
+ fmt.Printf("resolver:trigger:subscription:startup:failed:%d\n", add.id.SubscriptionID)
+ }
+ r.asyncErrorWriter.WriteError(add.ctx, err, add.resolve.Response, add.writer)
+ _ = r.AsyncUnsubscribeSubscription(add.id)
+ return err
+ }
+
+ return nil
+}
+
func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription) {
var (
err error
@@ -608,13 +650,18 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription)
trig, ok := r.triggers[triggerID]
if ok {
- trig.subscriptions[add.ctx] = s
if r.reporter != nil {
r.reporter.SubscriptionCountInc(1)
}
if r.options.Debug {
fmt.Printf("resolver:trigger:subscription:added:%d:%d\n", triggerID, add.id.SubscriptionID)
}
+ s.workChan <- workItem{
+ fn: func() {
+ _ = r.executeStartupHooks(add, trig.updater)
+ },
+ }
+ trig.subscriptions[add.ctx] = s
return
}
@@ -633,9 +680,11 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription)
id: triggerID,
subscriptions: make(map[*Context]*sub),
cancel: cancel,
+ updater: updater,
}
r.triggers[triggerID] = trig
trig.subscriptions[add.ctx] = s
+ updater.subsFn = trig.subscriptionIDs
if r.reporter != nil {
r.reporter.SubscriptionCountInc(1)
@@ -655,10 +704,14 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription)
if r.options.Debug {
fmt.Printf("resolver:trigger:start:%d\n", triggerID)
}
+ err = r.executeStartupHooks(add, trig.updater)
+ if err != nil {
+ return
+ }
if asyncDataSource != nil {
- err = asyncDataSource.AsyncStart(cloneCtx, triggerID, add.input, updater)
+ err = asyncDataSource.AsyncStart(cloneCtx, triggerID, add.input, trig.updater)
} else {
- err = add.resolve.Trigger.Source.Start(cloneCtx, add.input, updater)
+ err = add.resolve.Trigger.Source.Start(cloneCtx, add.input, trig.updater)
}
if err != nil {
if r.options.Debug {
@@ -732,18 +785,21 @@ func (r *Resolver) handleCompleteSubscription(id SubscriptionIdentifier) {
}
}
-func (r *Resolver) handleRemoveSubscription(id SubscriptionIdentifier) {
+func (r *Resolver) handleRemoveSubscription(id SubscriptionIdentifier, closeKind SubscriptionCloseKind) {
if r.options.Debug {
fmt.Printf("resolver:trigger:subscription:remove:%d:%d\n", id.ConnectionID, id.SubscriptionID)
}
+ if closeKind.WSCode == 0 && closeKind.Reason == "" {
+ closeKind = SubscriptionCloseKindNormal
+ }
removed := 0
for u := range r.triggers {
trig := r.triggers[u]
- removed += r.closeTriggerSubscriptions(u, SubscriptionCloseKindNormal, func(sID SubscriptionIdentifier) bool {
+ removed += r.closeTriggerSubscriptions(u, closeKind, func(sID SubscriptionIdentifier) bool {
return sID == id
})
if len(trig.subscriptions) == 0 {
- r.closeTrigger(trig.id, SubscriptionCloseKindNormal)
+ r.closeTrigger(trig.id, closeKind)
}
}
if r.reporter != nil {
@@ -779,32 +835,50 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) {
}
for c, s := range trig.subscriptions {
- c, s := c, s
- if err := c.ctx.Err(); err != nil {
- continue // no need to schedule an event update when the client already disconnected
- }
- skip, err := s.resolve.Filter.SkipEvent(c, data, r.triggerUpdateBuf)
- if err != nil {
- r.asyncErrorWriter.WriteError(c, err, s.resolve.Response, s.writer)
- continue
- }
- if skip {
+ r.sendUpdateToSubscription(data, c, s)
+ }
+}
+
+func (r *Resolver) handleUpdateSubscription(id uint64, data []byte, subIdentifier SubscriptionIdentifier) {
+ trig, ok := r.triggers[id]
+ if !ok {
+ return
+ }
+ if r.options.Debug {
+ fmt.Printf("resolver:trigger:subscription:update:%d:%d,%d\n", id, subIdentifier.ConnectionID, subIdentifier.SubscriptionID)
+ }
+
+ for c, s := range trig.subscriptions {
+ if s.id != subIdentifier {
continue
}
+ r.sendUpdateToSubscription(data, c, s)
+ break
+ }
+}
- fn := func() {
- r.executeSubscriptionUpdate(c, s, data)
- }
+func (r *Resolver) sendUpdateToSubscription(data []byte, c *Context, s *sub) {
+ if err := c.ctx.Err(); err != nil {
+ return
+ }
+ skip, err := s.resolve.Filter.SkipEvent(c, data, r.triggerUpdateBuf)
+ if err != nil {
+ r.asyncErrorWriter.WriteError(c, err, s.resolve.Response, s.writer)
+ return
+ }
+ if skip {
+ return
+ }
- select {
- case <-r.ctx.Done():
- // Skip sending all events if the resolver is shutting down
- return
- case <-c.ctx.Done():
- // Skip sending the event if the client disconnected
- case s.workChan <- workItem{fn, false}:
- // Send the event to the subscription worker
- }
+ fn := func() {
+ r.executeSubscriptionUpdate(c, s, data)
+ }
+
+ select {
+ case <-r.ctx.Done():
+ return
+ case <-c.ctx.Done():
+ case s.workChan <- workItem{fn, false}:
}
}
@@ -1205,6 +1279,7 @@ type subscriptionUpdater struct {
triggerID uint64
ch chan subscriptionEvent
ctx context.Context
+ subsFn func() map[context.Context]SubscriptionIdentifier
}
func (s *subscriptionUpdater) Update(data []byte) {
@@ -1224,6 +1299,30 @@ func (s *subscriptionUpdater) Update(data []byte) {
}
}
+func (s *subscriptionUpdater) UpdateSubscription(id SubscriptionIdentifier, data []byte) {
+ if s.debug {
+ fmt.Printf("resolver:subscription_updater:update:%d\n", s.triggerID)
+ }
+
+ select {
+ case <-s.ctx.Done():
+ return
+ case s.ch <- subscriptionEvent{
+ triggerID: s.triggerID,
+ kind: subscriptionEventKindUpdateSubscription,
+ data: data,
+ id: id,
+ }:
+ }
+}
+
+func (s *subscriptionUpdater) Subscriptions() map[context.Context]SubscriptionIdentifier {
+ if s.subsFn == nil {
+ return nil
+ }
+ return s.subsFn()
+}
+
func (s *subscriptionUpdater) Complete() {
if s.debug {
fmt.Printf("resolver:subscription_updater:complete:%d\n", s.triggerID)
@@ -1269,6 +1368,29 @@ func (s *subscriptionUpdater) Close(kind SubscriptionCloseKind) {
}
}
+func (s *subscriptionUpdater) CloseSubscription(kind SubscriptionCloseKind, id SubscriptionIdentifier) {
+ if s.debug {
+ fmt.Printf("resolver:subscription_updater:close:%d\n", s.triggerID)
+ }
+
+ select {
+ case <-s.ctx.Done():
+ if s.debug {
+ fmt.Printf("resolver:subscription_updater:close:skip:%d\n", s.triggerID)
+ }
+ return
+ case s.ch <- subscriptionEvent{
+ triggerID: s.triggerID,
+ kind: subscriptionEventKindRemoveSubscription,
+ closeKind: kind,
+ id: id,
+ }:
+ if s.debug {
+ fmt.Printf("resolver:subscription_updater:close:sent_event:%d\n", s.triggerID)
+ }
+ }
+}
+
type subscriptionEvent struct {
triggerID uint64
id SubscriptionIdentifier
@@ -1299,13 +1421,20 @@ const (
subscriptionEventKindRemoveClient
subscriptionEventKindTriggerInitialized
subscriptionEventKindTriggerClose
+ subscriptionEventKindUpdateSubscription
)
type SubscriptionUpdater interface {
// Update sends an update to the client. It is not guaranteed that the update is sent immediately.
Update(data []byte)
+ // UpdateSubscription sends an update to a single subscription. It is not guaranteed that the update is sent immediately.
+ UpdateSubscription(id SubscriptionIdentifier, data []byte)
// Complete also takes care of cleaning up the trigger and all subscriptions. No more updates should be sent after calling Complete.
Complete()
// Close closes the subscription and cleans up the trigger and all subscriptions. No more updates should be sent after calling Close.
Close(kind SubscriptionCloseKind)
+ // CloseSubscription closes a single subscription.
+ CloseSubscription(kind SubscriptionCloseKind, id SubscriptionIdentifier)
+ // Subscriptions returns the subscriptions associated with this updater.
+ Subscriptions() map[context.Context]SubscriptionIdentifier
}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/resolve/resolve_test.go b/app/v2/pkg/engine/resolve/resolve_test.go
index 8e15ff9..37f3d08 100644
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-4/app/v2/pkg/engine/resolve/resolve_test.go
+++ b/app/v2/pkg/engine/resolve/resolve_test.go
@@ -4794,12 +4794,16 @@ func (s *SubscriptionRecorder) Messages() []string {
return s.messages
}
-func createFakeStream(messageFunc messageFunc, delay time.Duration, onStart func(input []byte)) *_fakeStream {
- return &_fakeStream{
+func createFakeStream(messageFunc messageFunc, delay time.Duration, onStart func(input []byte), subscriptionOnStartFns ...func(ctx StartupHookContext, input []byte) (err error)) *_fakeStream {
+ stream := &_fakeStream{
messageFunc: messageFunc,
delay: delay,
onStart: onStart,
}
+ if len(subscriptionOnStartFns) > 0 {
+ stream.subscriptionOnStartFn = subscriptionOnStartFns[0]
+ }
+ return stream
}
type messageFunc func(counter int) (message string, done bool)
@@ -4807,10 +4811,19 @@ type messageFunc func(counter int) (message string, done bool)
var fakeStreamRequestId atomic.Int32
type _fakeStream struct {
- messageFunc messageFunc
- onStart func(input []byte)
- delay time.Duration
- isDone atomic.Bool
+ uniqueRequestFn func(ctx *Context, input []byte, xxh *xxhash.Digest) (err error)
+ messageFunc messageFunc
+ onStart func(input []byte)
+ delay time.Duration
+ isDone atomic.Bool
+ subscriptionOnStartFn func(ctx StartupHookContext, input []byte) (err error)
+}
+
+func (f *_fakeStream) SubscriptionOnStart(ctx StartupHookContext, input []byte) (err error) {
+ if f.subscriptionOnStartFn == nil {
+ return nil
+ }
+ return f.subscriptionOnStartFn(ctx, input)
}
func (f *_fakeStream) AwaitIsDone(t *testing.T, timeout time.Duration) {
@@ -4828,6 +4841,9 @@ func (f *_fakeStream) AwaitIsDone(t *testing.T, timeout time.Duration) {
}
func (f *_fakeStream) UniqueRequestID(ctx *Context, input []byte, xxh *xxhash.Digest) (err error) {
+ if f.uniqueRequestFn != nil {
+ return f.uniqueRequestFn(ctx, input, xxh)
+ }
_, err = fmt.Fprint(xxh, fakeStreamRequestId.Add(1))
if err != nil {
return
@@ -5441,6 +5457,83 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) {
recorder.AwaitClosed(t, defaultTimeout)
fakeStream.AwaitIsDone(t, defaultTimeout)
})
+
+ t.Run("should call subscription startup hook", func(t *testing.T) {
+ c, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ called := make(chan struct{}, 1)
+ fakeStream := createFakeStream(func(counter int) (message string, done bool) {
+ return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == 0
+ }, time.Millisecond, func(input []byte) {
+ assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
+ }, func(ctx StartupHookContext, input []byte) (err error) {
+ called <- struct{}{}
+ return nil
+ })
+
+ resolver, plan, recorder, id := setup(c, fakeStream)
+ ctx := &Context{ctx: context.Background()}
+
+ err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
+ assert.NoError(t, err)
+
+ select {
+ case <-called:
+ case <-time.After(defaultTimeout):
+ t.Fatal("SubscriptionOnStart hook was not called")
+ }
+
+ recorder.AwaitComplete(t, defaultTimeout)
+ })
+
+ t.Run("startup hook can emit an initial update", func(t *testing.T) {
+ c, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ fakeStream := createFakeStream(func(counter int) (message string, done bool) {
+ return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == 0
+ }, time.Millisecond, func(input []byte) {
+ assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
+ }, func(ctx StartupHookContext, input []byte) (err error) {
+ ctx.Updater([]byte(`{"data":{"counter":1000}}`))
+ return nil
+ })
+
+ resolver, plan, recorder, id := setup(c, fakeStream)
+
+ ctx := &Context{ctx: context.Background()}
+ err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
+ assert.NoError(t, err)
+
+ recorder.AwaitComplete(t, defaultTimeout)
+
+ assert.Equal(t, []string{
+ `{"data":{"counter":1000}}`,
+ `{"data":{"counter":0}}`,
+ }, recorder.Messages())
+ })
+
+ t.Run("should propagate startup hook errors", func(t *testing.T) {
+ c, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ expectedErr := errors.New("startup hook failed")
+ fakeStream := createFakeStream(func(counter int) (message string, done bool) {
+ return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == 0
+ }, time.Millisecond, func(input []byte) {
+ assert.Equal(t, `{"method":"POST","url":"http://localhost:4000","body":{"query":"subscription { counter }"}}`, string(input))
+ }, func(ctx StartupHookContext, input []byte) (err error) {
+ return expectedErr
+ })
+
+ resolver, plan, recorder, id := setup(c, fakeStream)
+ ctx := &Context{ctx: context.Background()}
+
+ err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
+ assert.NoError(t, err)
+ recorder.AwaitClosed(t, defaultTimeout)
+ })
}
func Test_ResolveGraphQLSubscriptionWithFilter(t *testing.T) {