STET

agent_patch

runs/2026-02-28__13-38-10__gpt-5-3-codex/flux-pr-1309/flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/agent-logs/agent.patch

75560 bytes

Back to adjudication
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go b/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go
index 7f41943..981981f 100644
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go
+++ b/app/v2/pkg/engine/datasource/graphql_datasource/graphql_datasource_test.go
@@ -8272,6 +8272,10 @@ func (t *testSubscriptionUpdaterChan) Update(data []byte) {
 	t.updates <- string(data)
 }
 
+func (t *testSubscriptionUpdaterChan) UpdateSubscription(_ resolve.SubscriptionIdentifier, data []byte) {
+	t.Update(data)
+}
+
 func (t *testSubscriptionUpdaterChan) Complete() {
 	close(t.complete)
 }
@@ -8280,6 +8284,10 @@ func (t *testSubscriptionUpdaterChan) Close(kind resolve.SubscriptionCloseKind)
 	t.closed <- kind
 }
 
+func (t *testSubscriptionUpdaterChan) CloseSubscription(_ resolve.SubscriptionIdentifier, kind resolve.SubscriptionCloseKind) {
+	t.Close(kind)
+}
+
 func (t *testSubscriptionUpdaterChan) AwaitUpdateWithT(tt *testing.T, timeout time.Duration, f func(t *testing.T, update string), msgAndArgs ...any) {
 	tt.Helper()
 
@@ -8385,6 +8393,10 @@ func (t *testSubscriptionUpdater) Update(data []byte) {
 	t.updates = append(t.updates, string(data))
 }
 
+func (t *testSubscriptionUpdater) UpdateSubscription(_ resolve.SubscriptionIdentifier, data []byte) {
+	t.Update(data)
+}
+
 func (t *testSubscriptionUpdater) Complete() {
 	t.mux.Lock()
 	defer t.mux.Unlock()
@@ -8397,6 +8409,10 @@ func (t *testSubscriptionUpdater) Close(kind resolve.SubscriptionCloseKind) {
 	t.closed = true
 }
 
+func (t *testSubscriptionUpdater) CloseSubscription(_ resolve.SubscriptionIdentifier, kind resolve.SubscriptionCloseKind) {
+	t.Close(kind)
+}
+
 func TestSubscriptionSource_Start(t *testing.T) {
 	chatServer := httptest.NewServer(subscriptiontesting.ChatGraphQLEndpointHandler())
 	defer chatServer.Close()
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/kafka_event_manager.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/kafka_event_manager.go
deleted file mode 100644
index 80b02e1..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/kafka_event_manager.go
+++ /dev/null
@@ -1,72 +0,0 @@
-package pubsub_datasource
-
-import (
-	"encoding/json"
-	"fmt"
-	"slices"
-
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
-)
-
-type KafkaSubscriptionEventConfiguration struct {
-	ProviderID string   `json:"providerId"`
-	Topics     []string `json:"topics"`
-}
-
-type KafkaPublishEventConfiguration struct {
-	ProviderID string          `json:"providerId"`
-	Topic      string          `json:"topic"`
-	Data       json.RawMessage `json:"data"`
-}
-
-func (s *KafkaPublishEventConfiguration) MarshalJSONTemplate() string {
-	return fmt.Sprintf(`{"topic":"%s", "data": %s, "providerId":"%s"}`, s.Topic, s.Data, s.ProviderID)
-}
-
-type KafkaEventManager struct {
-	visitor                        *plan.Visitor
-	variables                      *resolve.Variables
-	eventMetadata                  EventMetadata
-	eventConfiguration             *KafkaEventConfiguration
-	publishEventConfiguration      *KafkaPublishEventConfiguration
-	subscriptionEventConfiguration *KafkaSubscriptionEventConfiguration
-}
-
-func (p *KafkaEventManager) eventDataBytes(ref int) ([]byte, error) {
-	return buildEventDataBytes(ref, p.visitor, p.variables)
-}
-
-func (p *KafkaEventManager) handlePublishEvent(ref int) {
-	if len(p.eventConfiguration.Topics) != 1 {
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("publish and request events should define one subject but received %d", len(p.eventConfiguration.Topics)))
-		return
-	}
-	topic := p.eventConfiguration.Topics[0]
-	dataBytes, err := p.eventDataBytes(ref)
-	if err != nil {
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to write event data bytes: %w", err))
-		return
-	}
-
-	p.publishEventConfiguration = &KafkaPublishEventConfiguration{
-		ProviderID: p.eventMetadata.ProviderID,
-		Topic:      topic,
-		Data:       dataBytes,
-	}
-}
-
-func (p *KafkaEventManager) handleSubscriptionEvent(ref int) {
-
-	if len(p.eventConfiguration.Topics) == 0 {
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("expected at least one subscription topic but received %d", len(p.eventConfiguration.Topics)))
-		return
-	}
-
-	slices.Sort(p.eventConfiguration.Topics)
-
-	p.subscriptionEventConfiguration = &KafkaSubscriptionEventConfiguration{
-		ProviderID: p.eventMetadata.ProviderID,
-		Topics:     p.eventConfiguration.Topics,
-	}
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
deleted file mode 100644
index 04fc7e7..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/nats_event_manager.go
+++ /dev/null
@@ -1,189 +0,0 @@
-package pubsub_datasource
-
-import (
-	"encoding/json"
-	"fmt"
-	"regexp"
-	"slices"
-	"strings"
-
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/argument_templates"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
-)
-
-const (
-	fwc  = '>'
-	tsep = "."
-)
-
-// A variable template has form $$number$$ where the number can range from one to multiple digits
-var (
-	variableTemplateRegex = regexp.MustCompile(`\$\$\d+\$\$`)
-)
-
-type NatsSubscriptionEventConfiguration struct {
-	ProviderID          string                   `json:"providerId"`
-	Subjects            []string                 `json:"subjects"`
-	StreamConfiguration *NatsStreamConfiguration `json:"streamConfiguration,omitempty"`
-}
-
-type NatsPublishAndRequestEventConfiguration struct {
-	ProviderID string          `json:"providerId"`
-	Subject    string          `json:"subject"`
-	Data       json.RawMessage `json:"data"`
-}
-
-func (s *NatsPublishAndRequestEventConfiguration) MarshalJSONTemplate() string {
-	return fmt.Sprintf(`{"subject":"%s", "data": %s, "providerId":"%s"}`, s.Subject, s.Data, s.ProviderID)
-}
-
-type NatsEventManager struct {
-	visitor                             *plan.Visitor
-	variables                           *resolve.Variables
-	eventMetadata                       EventMetadata
-	eventConfiguration                  *NatsEventConfiguration
-	publishAndRequestEventConfiguration *NatsPublishAndRequestEventConfiguration
-	subscriptionEventConfiguration      *NatsSubscriptionEventConfiguration
-}
-
-func isValidNatsSubject(subject string) bool {
-	if subject == "" {
-		return false
-	}
-	sfwc := false
-	tokens := strings.Split(subject, tsep)
-	for _, t := range tokens {
-		length := len(t)
-		if length == 0 || sfwc {
-			return false
-		}
-		if length > 1 {
-			if strings.ContainsAny(t, "\t\n\f\r ") {
-				return false
-			}
-			continue
-		}
-		switch t[0] {
-		case fwc:
-			sfwc = true
-		case ' ', '\t', '\n', '\r', '\f':
-			return false
-		}
-	}
-	return true
-}
-
-func (p *NatsEventManager) addContextVariableByArgumentRef(argumentRef int, argumentPath []string) (string, error) {
-	variablePath, err := p.visitor.Operation.VariablePathByArgumentRefAndArgumentPath(argumentRef, argumentPath, p.visitor.Walker.Ancestors[0].Ref)
-	if err != nil {
-		return "", err
-	}
-	/* The definition is passed as both definition and operation below because getJSONRootType resolves the type
-	 * from the first argument, but finalInputValueTypeRef comes from the definition
-	 */
-	contextVariable := &resolve.ContextVariable{
-		Path:     variablePath,
-		Renderer: resolve.NewPlainVariableRenderer(),
-	}
-	variablePlaceHolder, _ := p.variables.AddVariable(contextVariable)
-	return variablePlaceHolder, nil
-}
-
-func (p *NatsEventManager) extractEventSubject(fieldRef int, subject string) (string, error) {
-	matches := argument_templates.ArgumentTemplateRegex.FindAllStringSubmatch(subject, -1)
-	// If no argument templates are defined, there are only static values
-	if len(matches) < 1 {
-		if isValidNatsSubject(subject) {
-			return subject, nil
-		}
-		return "", fmt.Errorf(`subject "%s" is not a valid NATS subject`, subject)
-	}
-	fieldNameBytes := p.visitor.Operation.FieldNameBytes(fieldRef)
-	// TODO: handling for interfaces and unions
-	fieldDefinitionRef, ok := p.visitor.Definition.ObjectTypeDefinitionFieldWithName(p.visitor.Walker.EnclosingTypeDefinition.Ref, fieldNameBytes)
-	if !ok {
-		return "", fmt.Errorf(`expected field definition to exist for field "%s"`, fieldNameBytes)
-	}
-	subjectWithVariableTemplateReplacements := subject
-	for templateNumber, groups := range matches {
-		// The first group is the whole template; the second is the period delimited argument path
-		if len(groups) != 2 {
-			return "", fmt.Errorf(`argument template #%d defined on field "%s" is invalid: expected 2 matching groups but received %d`, templateNumber+1, fieldNameBytes, len(groups)-1)
-		}
-		validationResult, err := argument_templates.ValidateArgumentPath(p.visitor.Definition, groups[1], fieldDefinitionRef)
-		if err != nil {
-			return "", fmt.Errorf(`argument template #%d defined on field "%s" is invalid: %w`, templateNumber+1, fieldNameBytes, err)
-		}
-		argumentNameBytes := []byte(validationResult.ArgumentPath[0])
-		argumentRef, ok := p.visitor.Operation.FieldArgument(fieldRef, argumentNameBytes)
-		if !ok {
-			return "", fmt.Errorf(`operation field "%s" does not define argument "%s"`, fieldNameBytes, argumentNameBytes)
-		}
-		// variablePlaceholder has the form $$0$$, $$1$$, etc.
-		variablePlaceholder, err := p.addContextVariableByArgumentRef(argumentRef, validationResult.ArgumentPath)
-		if err != nil {
-			return "", fmt.Errorf(`failed to retrieve variable placeholder for argument ""%s" defined on operation field "%s": %w`, argumentNameBytes, fieldNameBytes, err)
-		}
-		// Replace the template literal with the variable placeholder (and reuse the variable if it already exists)
-		subjectWithVariableTemplateReplacements = strings.ReplaceAll(subjectWithVariableTemplateReplacements, groups[0], variablePlaceholder)
-	}
-	// Substitute the variable templates for dummy values to check naïvely that the string is a valid NATS subject
-	if isValidNatsSubject(variableTemplateRegex.ReplaceAllLiteralString(subjectWithVariableTemplateReplacements, "a")) {
-		return subjectWithVariableTemplateReplacements, nil
-	}
-	return "", fmt.Errorf(`subject "%s" is not a valid NATS subject`, subject)
-}
-
-func (p *NatsEventManager) eventDataBytes(ref int) ([]byte, error) {
-	return buildEventDataBytes(ref, p.visitor, p.variables)
-}
-
-func (p *NatsEventManager) handlePublishAndRequestEvent(ref int) {
-	if len(p.eventConfiguration.Subjects) != 1 {
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("publish and request events should define one subject but received %d", len(p.eventConfiguration.Subjects)))
-		return
-	}
-	rawSubject := p.eventConfiguration.Subjects[0]
-	extractedSubject, err := p.extractEventSubject(ref, rawSubject)
-	if err != nil {
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("could not extract event subject: %w", err))
-		return
-	}
-	dataBytes, err := p.eventDataBytes(ref)
-	if err != nil {
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to write event data bytes: %w", err))
-		return
-	}
-
-	p.publishAndRequestEventConfiguration = &NatsPublishAndRequestEventConfiguration{
-		ProviderID: p.eventMetadata.ProviderID,
-		Subject:    extractedSubject,
-		Data:       dataBytes,
-	}
-}
-
-func (p *NatsEventManager) handleSubscriptionEvent(ref int) {
-
-	if len(p.eventConfiguration.Subjects) == 0 {
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("expected at least one subscription subject but received %d", len(p.eventConfiguration.Subjects)))
-		return
-	}
-	extractedSubjects := make([]string, 0, len(p.eventConfiguration.Subjects))
-	for _, rawSubject := range p.eventConfiguration.Subjects {
-		extractedSubject, err := p.extractEventSubject(ref, rawSubject)
-		if err != nil {
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("could not extract subscription event subjects: %w", err))
-			return
-		}
-		extractedSubjects = append(extractedSubjects, extractedSubject)
-	}
-
-	slices.Sort(extractedSubjects)
-
-	p.subscriptionEventConfiguration = &NatsSubscriptionEventConfiguration{
-		ProviderID:          p.eventMetadata.ProviderID,
-		Subjects:            extractedSubjects,
-		StreamConfiguration: p.eventConfiguration.StreamConfiguration,
-	}
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
deleted file mode 100644
index bdf1230..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource.go
+++ /dev/null
@@ -1,346 +0,0 @@
-package pubsub_datasource
-
-import (
-	"bytes"
-	"context"
-	"encoding/json"
-	"fmt"
-	"regexp"
-	"strings"
-
-	"github.com/jensneuse/abstractlogger"
-
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/ast"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
-)
-
-type EventType string
-
-const (
-	EventTypePublish   EventType = "publish"
-	EventTypeRequest   EventType = "request"
-	EventTypeSubscribe EventType = "subscribe"
-)
-
-var eventSubjectRegex = regexp.MustCompile(`{{ args.([a-zA-Z0-9_]+) }}`)
-
-func EventTypeFromString(s string) (EventType, error) {
-	et := EventType(strings.ToLower(s))
-	switch et {
-	case EventTypePublish, EventTypeRequest, EventTypeSubscribe:
-		return et, nil
-	default:
-		return "", fmt.Errorf("invalid event type: %q", s)
-	}
-}
-
-type EventMetadata struct {
-	ProviderID string    `json:"providerId"`
-	Type       EventType `json:"type"`
-	TypeName   string    `json:"typeName"`
-	FieldName  string    `json:"fieldName"`
-}
-
-type EventConfiguration struct {
-	Metadata      *EventMetadata `json:"metadata"`
-	Configuration any            `json:"configuration"`
-}
-
-type Configuration struct {
-	Events []EventConfiguration `json:"events"`
-}
-
-type Planner[T Configuration] struct {
-	id                      int
-	config                  Configuration
-	natsPubSubByProviderID  map[string]NatsPubSub
-	kafkaPubSubByProviderID map[string]KafkaPubSub
-	eventManager            any
-	rootFieldRef            int
-	variables               resolve.Variables
-	visitor                 *plan.Visitor
-}
-
-func (p *Planner[T]) SetID(id int) {
-	p.id = id
-}
-
-func (p *Planner[T]) ID() (id int) {
-	return p.id
-}
-
-func (p *Planner[T]) EnterField(ref int) {
-	if p.rootFieldRef != -1 {
-		// This is a nested field; nothing needs to be done
-		return
-	}
-	p.rootFieldRef = ref
-
-	fieldName := p.visitor.Operation.FieldNameString(ref)
-	typeName := p.visitor.Walker.EnclosingTypeDefinition.NameString(p.visitor.Definition)
-
-	var eventConfig *EventConfiguration
-	for _, cfg := range p.config.Events {
-		if cfg.Metadata.TypeName == typeName && cfg.Metadata.FieldName == fieldName {
-			eventConfig = &cfg
-			break
-		}
-	}
-	if eventConfig == nil {
-		return
-	}
-
-	switch v := eventConfig.Configuration.(type) {
-	case *NatsEventConfiguration:
-		em := &NatsEventManager{
-			visitor:            p.visitor,
-			variables:          &p.variables,
-			eventMetadata:      *eventConfig.Metadata,
-			eventConfiguration: v,
-		}
-		p.eventManager = em
-
-		switch eventConfig.Metadata.Type {
-		case EventTypePublish, EventTypeRequest:
-			em.handlePublishAndRequestEvent(ref)
-		case EventTypeSubscribe:
-			em.handleSubscriptionEvent(ref)
-		default:
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("invalid EventType \"%s\" for Nats", eventConfig.Metadata.Type))
-		}
-	case *KafkaEventConfiguration:
-		em := &KafkaEventManager{
-			visitor:            p.visitor,
-			variables:          &p.variables,
-			eventMetadata:      *eventConfig.Metadata,
-			eventConfiguration: v,
-		}
-		p.eventManager = em
-
-		switch eventConfig.Metadata.Type {
-		case EventTypePublish:
-			em.handlePublishEvent(ref)
-		case EventTypeSubscribe:
-			em.handleSubscriptionEvent(ref)
-		default:
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("invalid EventType \"%s\" for Kafka", eventConfig.Metadata.Type))
-		}
-	default:
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("invalid event configuration type: %T", v))
-	}
-}
-
-func (p *Planner[T]) EnterDocument(_, _ *ast.Document) {
-	p.rootFieldRef = -1
-	p.eventManager = nil
-}
-
-func (p *Planner[T]) Register(visitor *plan.Visitor, configuration plan.DataSourceConfiguration[T], dataSourcePlannerConfiguration plan.DataSourcePlannerConfiguration) error {
-	p.visitor = visitor
-	visitor.Walker.RegisterEnterFieldVisitor(p)
-	visitor.Walker.RegisterEnterDocumentVisitor(p)
-	p.config = Configuration(configuration.CustomConfiguration())
-	return nil
-}
-
-func (p *Planner[T]) ConfigureFetch() resolve.FetchConfiguration {
-	if p.eventManager == nil {
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure fetch: event manager is nil"))
-		return resolve.FetchConfiguration{}
-	}
-
-	var dataSource resolve.DataSource
-
-	switch v := p.eventManager.(type) {
-	case *NatsEventManager:
-		pubsub, ok := p.natsPubSubByProviderID[v.eventMetadata.ProviderID]
-		if !ok {
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("no pubsub connection exists with provider id \"%s\"", v.eventMetadata.ProviderID))
-			return resolve.FetchConfiguration{}
-		}
-
-		switch v.eventMetadata.Type {
-		case EventTypePublish:
-			dataSource = &NatsPublishDataSource{
-				pubSub: pubsub,
-			}
-		case EventTypeRequest:
-			dataSource = &NatsRequestDataSource{
-				pubSub: pubsub,
-			}
-		default:
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure fetch: invalid event type \"%s\" for Nats", v.eventMetadata.Type))
-			return resolve.FetchConfiguration{}
-		}
-
-		return resolve.FetchConfiguration{
-			Input:      v.publishAndRequestEventConfiguration.MarshalJSONTemplate(),
-			Variables:  p.variables,
-			DataSource: dataSource,
-			PostProcessing: resolve.PostProcessingConfiguration{
-				MergePath: []string{v.eventMetadata.FieldName},
-			},
-		}
-
-	case *KafkaEventManager:
-		pubsub, ok := p.kafkaPubSubByProviderID[v.eventMetadata.ProviderID]
-		if !ok {
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("no pubsub connection exists with provider id \"%s\"", v.eventMetadata.ProviderID))
-			return resolve.FetchConfiguration{}
-		}
-
-		switch v.eventMetadata.Type {
-		case EventTypePublish:
-			dataSource = &KafkaPublishDataSource{
-				pubSub: pubsub,
-			}
-		case EventTypeRequest:
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("event type \"%s\" is not supported for Kafka", v.eventMetadata.Type))
-			return resolve.FetchConfiguration{}
-		default:
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure fetch: invalid event type \"%s\" for Kafka", v.eventMetadata.Type))
-			return resolve.FetchConfiguration{}
-		}
-
-		return resolve.FetchConfiguration{
-			Input:      v.publishEventConfiguration.MarshalJSONTemplate(),
-			Variables:  p.variables,
-			DataSource: dataSource,
-			PostProcessing: resolve.PostProcessingConfiguration{
-				MergePath: []string{v.eventMetadata.FieldName},
-			},
-		}
-
-	default:
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure fetch: invalid event manager type: %T", p.eventManager))
-	}
-
-	return resolve.FetchConfiguration{}
-}
-
-func (p *Planner[T]) ConfigureSubscription() plan.SubscriptionConfiguration {
-	if p.eventManager == nil {
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure subscription: event manager is nil"))
-		return plan.SubscriptionConfiguration{}
-	}
-
-	switch v := p.eventManager.(type) {
-	case *NatsEventManager:
-		pubsub, ok := p.natsPubSubByProviderID[v.eventMetadata.ProviderID]
-		if !ok {
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("no pubsub connection exists with provider id \"%s\"", v.eventMetadata.ProviderID))
-			return plan.SubscriptionConfiguration{}
-		}
-		object, err := json.Marshal(v.subscriptionEventConfiguration)
-		if err != nil {
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to marshal event subscription streamConfiguration"))
-			return plan.SubscriptionConfiguration{}
-		}
-		return plan.SubscriptionConfiguration{
-			Input:     string(object),
-			Variables: p.variables,
-			DataSource: &NatsSubscriptionSource{
-				pubSub: pubsub,
-			},
-			PostProcessing: resolve.PostProcessingConfiguration{
-				MergePath: []string{v.eventMetadata.FieldName},
-			},
-		}
-	case *KafkaEventManager:
-		pubsub, ok := p.kafkaPubSubByProviderID[v.eventMetadata.ProviderID]
-		if !ok {
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("no pubsub connection exists with provider id \"%s\"", v.eventMetadata.ProviderID))
-			return plan.SubscriptionConfiguration{}
-		}
-		object, err := json.Marshal(v.subscriptionEventConfiguration)
-		if err != nil {
-			p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to marshal event subscription streamConfiguration"))
-			return plan.SubscriptionConfiguration{}
-		}
-		return plan.SubscriptionConfiguration{
-			Input:     string(object),
-			Variables: p.variables,
-			DataSource: &KafkaSubscriptionSource{
-				pubSub: pubsub,
-			},
-			PostProcessing: resolve.PostProcessingConfiguration{
-				MergePath: []string{v.eventMetadata.FieldName},
-			},
-		}
-	default:
-		p.visitor.Walker.StopWithInternalErr(fmt.Errorf("failed to configure subscription: invalid event manager type: %T", p.eventManager))
-	}
-
-	return plan.SubscriptionConfiguration{}
-}
-
-func (p *Planner[T]) DownstreamResponseFieldAlias(_ int) (alias string, exists bool) {
-	return "", false
-}
-
-func NewFactory[T Configuration](executionContext context.Context, natsPubSubByProviderID map[string]NatsPubSub, kafkaPubSubByProviderID map[string]KafkaPubSub) *Factory[T] {
-	return &Factory[T]{
-		executionContext:        executionContext,
-		natsPubSubByProviderID:  natsPubSubByProviderID,
-		kafkaPubSubByProviderID: kafkaPubSubByProviderID,
-	}
-}
-
-type Factory[T Configuration] struct {
-	executionContext        context.Context
-	natsPubSubByProviderID  map[string]NatsPubSub
-	kafkaPubSubByProviderID map[string]KafkaPubSub
-}
-
-func (f *Factory[T]) Planner(_ abstractlogger.Logger) plan.DataSourcePlanner[T] {
-	return &Planner[T]{
-		natsPubSubByProviderID:  f.natsPubSubByProviderID,
-		kafkaPubSubByProviderID: f.kafkaPubSubByProviderID,
-	}
-}
-
-func (f *Factory[T]) Context() context.Context {
-	return f.executionContext
-}
-
-func (f *Factory[T]) UpstreamSchema(_ plan.DataSourceConfiguration[T]) (*ast.Document, bool) {
-	return nil, false
-}
-
-func (f *Factory[T]) PlanningBehavior() plan.DataSourcePlanningBehavior {
-	return plan.DataSourcePlanningBehavior{
-		MergeAliasedRootNodes:      false,
-		OverrideFieldPathFromAlias: false,
-		AllowPlanningTypeName:      true,
-	}
-}
-
-func buildEventDataBytes(ref int, visitor *plan.Visitor, variables *resolve.Variables) ([]byte, error) {
-	// Collect the field arguments for fetch based operations
-	fieldArgs := visitor.Operation.FieldArguments(ref)
-	var dataBuffer bytes.Buffer
-	dataBuffer.WriteByte('{')
-	for i, arg := range fieldArgs {
-		if i > 0 {
-			dataBuffer.WriteByte(',')
-		}
-		argValue := visitor.Operation.ArgumentValue(arg)
-		variableName := visitor.Operation.VariableValueNameBytes(argValue.Ref)
-		contextVariable := &resolve.ContextVariable{
-			Path:     []string{string(variableName)},
-			Renderer: resolve.NewJSONVariableRenderer(),
-		}
-		variablePlaceHolder, _ := variables.AddVariable(contextVariable)
-		argumentName := visitor.Operation.ArgumentNameString(arg)
-		escapedKey, err := json.Marshal(argumentName)
-		if err != nil {
-			return nil, err
-		}
-		dataBuffer.Write(escapedKey)
-		dataBuffer.WriteByte(':')
-		dataBuffer.WriteString(variablePlaceHolder)
-	}
-	dataBuffer.WriteByte('}')
-	return dataBuffer.Bytes(), nil
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
deleted file mode 100644
index 28a37df..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_datasource_test.go
+++ /dev/null
@@ -1,669 +0,0 @@
-package pubsub_datasource
-
-import (
-	"bytes"
-	"context"
-	"errors"
-	"io"
-	"testing"
-
-	"github.com/stretchr/testify/require"
-
-	"github.com/wundergraph/astjson"
-
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasourcetesting"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/plan"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/internal/unsafeparser"
-)
-
-type testPubsub struct {
-}
-
-func (t *testPubsub) Subscribe(_ context.Context, _ NatsSubscriptionEventConfiguration, _ resolve.SubscriptionUpdater) error {
-	return errors.New("not implemented")
-}
-func (t *testPubsub) Publish(_ context.Context, _ NatsPublishAndRequestEventConfiguration) error {
-	return errors.New("not implemented")
-}
-
-func (t *testPubsub) Request(_ context.Context, _ NatsPublishAndRequestEventConfiguration, _ io.Writer) error {
-	return errors.New("not implemented")
-}
-
-func TestPubSub(t *testing.T) {
-	factory := &Factory[Configuration]{
-		natsPubSubByProviderID: map[string]NatsPubSub{"default": &testPubsub{}},
-	}
-
-	const schema = `
-	type Query {
-		helloQuery(userKey: UserKey!): User! @edfs__natsRequest(subject: "tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}")
-	}
-
-	type Mutation {
-		helloMutation(userKey: UserKey!): edfs__PublishResult! @edfs__natsPublish(subject: "tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}")
-	}
-
-	type Subscription {
-		helloSubscription(userKey: UserKey!): User! @edfs__natsSubscribe(subjects: ["tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}"])
-		subscriptionWithMultipleSubjects(userKeyOne: UserKey!, userKeyTwo: UserKey!): User! @edfs__natsSubscribe(subjects: ["tenantsOne.{{ args.userKeyOne.tenantId }}.users.{{ args.userKeyOne.id }}", "tenantsTwo.{{ args.userKeyTwo.tenantId }}.users.{{ args.userKeyTwo.id }}"])
-		subscriptionWithStaticValues: User! @edfs__natsSubscribe(subjects: ["tenants.1.users.1"])
-		subscriptionWithArgTemplateAndStaticValue(nestedUserKey: NestedUserKey!): User! @edfs__natsSubscribe(subjects: ["tenants.1.users.{{ args.nestedUserKey.user.id }}"])
-	}
-	
-	type User @key(fields: "id tenant { id }") {
-		id: Int! @external
-		tenant: Tenant! @external
-	}
-
-	type Tenant {
-		id: Int! @external
-	}
-
-	input UserKey {
-		id: Int!
-		tenantId: Int!
-	}
-
-	input NestedUserKey {
-		user: UserInput!
-		tenant: TenantInput!
-	}
-	
-	input UserInput {
-		id: Int!
-	}
-	
-	input TenantInput {
-		id: Int!
-	}
-
-	type edfs__PublishResult {
-		success: Boolean!	
-	}
-
-	input edfs__NatsStreamConfiguration {
-		consumerName: String!
-		streamName: String!
-	}
-	`
-
-	dataSourceCustomConfig := Configuration{
-		Events: []EventConfiguration{
-			{
-				Metadata: &EventMetadata{
-					FieldName:  "helloQuery",
-					ProviderID: "default",
-					Type:       EventTypeRequest,
-					TypeName:   "Query",
-				},
-				Configuration: &NatsEventConfiguration{
-					Subjects: []string{"tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}"},
-				},
-			},
-			{
-				Metadata: &EventMetadata{
-					FieldName:  "helloMutation",
-					ProviderID: "default",
-					Type:       EventTypePublish,
-					TypeName:   "Mutation",
-				},
-				Configuration: &NatsEventConfiguration{
-					Subjects: []string{"tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}"},
-				},
-			},
-			{
-				Metadata: &EventMetadata{
-					FieldName:  "helloSubscription",
-					ProviderID: "default",
-					Type:       EventTypeSubscribe,
-					TypeName:   "Subscription",
-				},
-				Configuration: &NatsEventConfiguration{
-					Subjects: []string{"tenants.{{ args.userKey.tenantId }}.users.{{ args.userKey.id }}"},
-				},
-			},
-			{
-				Metadata: &EventMetadata{
-					FieldName:  "subscriptionWithMultipleSubjects",
-					ProviderID: "default",
-					Type:       EventTypeSubscribe,
-					TypeName:   "Subscription",
-				},
-				Configuration: &NatsEventConfiguration{
-					Subjects: []string{"tenantsOne.{{ args.userKeyOne.tenantId }}.users.{{ args.userKeyOne.id }}", "tenantsTwo.{{ args.userKeyTwo.tenantId }}.users.{{ args.userKeyTwo.id }}"},
-				},
-			},
-			{
-				Metadata: &EventMetadata{
-					FieldName:  "subscriptionWithStaticValues",
-					ProviderID: "default",
-					Type:       EventTypeSubscribe,
-					TypeName:   "Subscription",
-				},
-				Configuration: &NatsEventConfiguration{
-					Subjects: []string{"tenants.1.users.1"},
-				},
-			},
-			{
-				Metadata: &EventMetadata{
-					FieldName:  "subscriptionWithArgTemplateAndStaticValue",
-					ProviderID: "default",
-					Type:       EventTypeSubscribe,
-					TypeName:   "Subscription",
-				},
-				Configuration: &NatsEventConfiguration{
-					Subjects: []string{"tenants.1.users.{{ args.nestedUserKey.user.id }}"},
-				},
-			},
-		},
-	}
-
-	dataSourceConfiguration, err := plan.NewDataSourceConfiguration[Configuration](
-		"test",
-		factory,
-		&plan.DataSourceMetadata{
-			RootNodes: []plan.TypeField{
-				{
-					TypeName:   "Query",
-					FieldNames: []string{"helloQuery"},
-				},
-				{
-					TypeName:   "Mutation",
-					FieldNames: []string{"helloMutation"},
-				},
-				{
-					TypeName:   "Subscription",
-					FieldNames: []string{"helloSubscription"},
-				},
-				{
-					TypeName:   "Subscription",
-					FieldNames: []string{"subscriptionWithMultipleSubjects"},
-				},
-				{
-					TypeName:   "Subscription",
-					FieldNames: []string{"subscriptionWithStaticValues"},
-				},
-				{
-					TypeName:   "Subscription",
-					FieldNames: []string{"subscriptionWithArgTemplateAndStaticValue"},
-				},
-			},
-			ChildNodes: []plan.TypeField{
-				// Entities are child nodes in pubsub datasources
-				{
-					TypeName:   "User",
-					FieldNames: []string{"id", "tenant"},
-				},
-				{
-					TypeName:   "Tenant",
-					FieldNames: []string{"id"},
-				},
-				{
-					TypeName:   "edfs__PublishResult",
-					FieldNames: []string{"success"},
-				},
-			},
-		},
-		dataSourceCustomConfig,
-	)
-	require.NoError(t, err)
-
-	planConfig := plan.Configuration{
-		DataSources: []plan.DataSource{
-			dataSourceConfiguration,
-		},
-		Fields: []plan.FieldConfiguration{
-			{
-				TypeName:  "Query",
-				FieldName: "helloQuery",
-				Arguments: []plan.ArgumentConfiguration{
-					{
-						Name:       "userKey",
-						SourceType: plan.FieldArgumentSource,
-					},
-				},
-			},
-			{
-				TypeName:  "Mutation",
-				FieldName: "helloMutation",
-				Arguments: []plan.ArgumentConfiguration{
-					{
-						Name:       "userKey",
-						SourceType: plan.FieldArgumentSource,
-					},
-				},
-			},
-			{
-				TypeName:  "Subscription",
-				FieldName: "helloSubscription",
-				Arguments: []plan.ArgumentConfiguration{
-					{
-						Name:       "userKey",
-						SourceType: plan.FieldArgumentSource,
-					},
-				},
-			},
-			{
-				TypeName:  "Subscription",
-				FieldName: "subscriptionWithMultipleSubjects",
-				Arguments: []plan.ArgumentConfiguration{
-					{
-						Name:       "userKeyOne",
-						SourceType: plan.FieldArgumentSource,
-					},
-					{
-						Name:       "userKeyTwo",
-						SourceType: plan.FieldArgumentSource,
-					},
-				},
-			},
-			{
-				TypeName:  "Subscription",
-				FieldName: "subscriptionWithArgTemplateAndStaticValue",
-				Arguments: []plan.ArgumentConfiguration{
-					{
-						Name:       "nestedUserKey",
-						SourceType: plan.FieldArgumentSource,
-					},
-				},
-			},
-		},
-		DisableResolveFieldPositions: true,
-	}
-
-	t.Run("query", func(t *testing.T) {
-		const operation = "query HelloQuery { helloQuery(userKey:{id:42,tenantId:3}) { id } }"
-		const operationName = `HelloQuery`
-		expect := &plan.SynchronousResponsePlan{
-			Response: &resolve.GraphQLResponse{
-				RawFetches: []*resolve.FetchItem{
-					{
-						Fetch: &resolve.SingleFetch{
-							FetchConfiguration: resolve.FetchConfiguration{
-								Input: `{"subject":"tenants.$$0$$.users.$$1$$", "data": {"userKey":$$2$$}, "providerId":"default"}`,
-								Variables: resolve.Variables{
-									&resolve.ContextVariable{
-										Path:     []string{"a", "tenantId"},
-										Renderer: resolve.NewPlainVariableRenderer(),
-									},
-									&resolve.ContextVariable{
-										Path:     []string{"a", "id"},
-										Renderer: resolve.NewPlainVariableRenderer(),
-									},
-									&resolve.ContextVariable{
-										Path:     []string{"a"},
-										Renderer: resolve.NewJSONVariableRenderer(),
-									},
-								},
-								DataSource: &NatsRequestDataSource{
-									pubSub: &testPubsub{},
-								},
-								PostProcessing: resolve.PostProcessingConfiguration{
-									MergePath: []string{"helloQuery"},
-								},
-							},
-							DataSourceIdentifier: []byte("pubsub_datasource.NatsRequestDataSource"),
-						},
-					},
-				},
-				Data: &resolve.Object{
-					Fields: []*resolve.Field{
-						{
-							Name: []byte("helloQuery"),
-							Value: &resolve.Object{
-								Path:     []string{"helloQuery"},
-								Nullable: false,
-								PossibleTypes: map[string]struct{}{
-									"User": {},
-								},
-								TypeName: "User",
-								Fields: []*resolve.Field{
-									{
-										Name: []byte("id"),
-										Value: &resolve.Integer{
-											Path:     []string{"id"},
-											Nullable: false,
-										},
-									},
-								},
-							},
-						},
-					},
-				},
-			},
-		}
-		datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
-	})
-
-	t.Run("mutation", func(t *testing.T) {
-		const operation = "mutation HelloMutation { helloMutation(userKey:{id:42,tenantId:3}) { success } }"
-		const operationName = `HelloMutation`
-		expect := &plan.SynchronousResponsePlan{
-			Response: &resolve.GraphQLResponse{
-				RawFetches: []*resolve.FetchItem{
-					{
-						Fetch: &resolve.SingleFetch{
-							FetchConfiguration: resolve.FetchConfiguration{
-								Input: `{"subject":"tenants.$$0$$.users.$$1$$", "data": {"userKey":$$2$$}, "providerId":"default"}`,
-								Variables: resolve.Variables{
-									&resolve.ContextVariable{
-										Path:     []string{"a", "tenantId"},
-										Renderer: resolve.NewPlainVariableRenderer(),
-									},
-									&resolve.ContextVariable{
-										Path:     []string{"a", "id"},
-										Renderer: resolve.NewPlainVariableRenderer(),
-									},
-									&resolve.ContextVariable{
-										Path:     []string{"a"},
-										Renderer: resolve.NewJSONVariableRenderer(),
-									},
-								},
-								DataSource: &NatsPublishDataSource{
-									pubSub: &testPubsub{},
-								},
-								PostProcessing: resolve.PostProcessingConfiguration{
-									MergePath: []string{"helloMutation"},
-								},
-							},
-							DataSourceIdentifier: []byte("pubsub_datasource.NatsPublishDataSource"),
-						},
-					},
-				},
-				Data: &resolve.Object{
-					Fields: []*resolve.Field{
-						{
-							Name: []byte("helloMutation"),
-							Value: &resolve.Object{
-								Path:     []string{"helloMutation"},
-								Nullable: false,
-								PossibleTypes: map[string]struct{}{
-									"edfs__PublishResult": {},
-								},
-								TypeName: "edfs__PublishResult",
-								Fields: []*resolve.Field{
-									{
-										Name: []byte("success"),
-										Value: &resolve.Boolean{
-											Path:     []string{"success"},
-											Nullable: false,
-										},
-									},
-								},
-							},
-						},
-					},
-				},
-			},
-		}
-		datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
-	})
-
-	t.Run("subscription", func(t *testing.T) {
-		const operation = "subscription HelloSubscription { helloSubscription(userKey:{id:42,tenantId:3}) { id } }"
-		const operationName = `HelloSubscription`
-		expect := &plan.SubscriptionResponsePlan{
-			Response: &resolve.GraphQLSubscription{
-				Trigger: resolve.GraphQLSubscriptionTrigger{
-					Input: []byte(`{"providerId":"default","subjects":["tenants.$$0$$.users.$$1$$"]}`),
-					Variables: resolve.Variables{
-						&resolve.ContextVariable{
-							Path:     []string{"a", "tenantId"},
-							Renderer: resolve.NewPlainVariableRenderer(),
-						},
-						&resolve.ContextVariable{
-							Path:     []string{"a", "id"},
-							Renderer: resolve.NewPlainVariableRenderer(),
-						},
-					},
-					Source: &NatsSubscriptionSource{
-						pubSub: &testPubsub{},
-					},
-					PostProcessing: resolve.PostProcessingConfiguration{
-						MergePath: []string{"helloSubscription"},
-					},
-				},
-				Response: &resolve.GraphQLResponse{
-					Data: &resolve.Object{
-						Fields: []*resolve.Field{
-							{
-								Name: []byte("helloSubscription"),
-								Value: &resolve.Object{
-									Path:     []string{"helloSubscription"},
-									Nullable: false,
-									PossibleTypes: map[string]struct{}{
-										"User": {},
-									},
-									TypeName: "User",
-									Fields: []*resolve.Field{
-										{
-											Name: []byte("id"),
-											Value: &resolve.Integer{
-												Path:     []string{"id"},
-												Nullable: false,
-											},
-										},
-									},
-								},
-							},
-						},
-					},
-				},
-			},
-		}
-		datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
-	})
-
-	t.Run("subscription with multiple subjects", func(t *testing.T) {
-		const operation = "subscription SubscriptionWithMultipleSubjects { subscriptionWithMultipleSubjects(userKeyOne:{id:42,tenantId:3},userKeyTwo:{id:24,tenantId:99}) { id } }"
-		const operationName = `SubscriptionWithMultipleSubjects`
-		expect := &plan.SubscriptionResponsePlan{
-			Response: &resolve.GraphQLSubscription{
-				Trigger: resolve.GraphQLSubscriptionTrigger{
-					Input: []byte(`{"providerId":"default","subjects":["tenantsOne.$$0$$.users.$$1$$","tenantsTwo.$$2$$.users.$$3$$"]}`),
-					Variables: resolve.Variables{
-						&resolve.ContextVariable{
-							Path:     []string{"a", "tenantId"},
-							Renderer: resolve.NewPlainVariableRenderer(),
-						},
-						&resolve.ContextVariable{
-							Path:     []string{"a", "id"},
-							Renderer: resolve.NewPlainVariableRenderer(),
-						},
-						&resolve.ContextVariable{
-							Path:     []string{"b", "tenantId"},
-							Renderer: resolve.NewPlainVariableRenderer(),
-						},
-						&resolve.ContextVariable{
-							Path:     []string{"b", "id"},
-							Renderer: resolve.NewPlainVariableRenderer(),
-						},
-					},
-					Source: &NatsSubscriptionSource{
-						pubSub: &testPubsub{},
-					},
-					PostProcessing: resolve.PostProcessingConfiguration{
-						MergePath: []string{"subscriptionWithMultipleSubjects"},
-					},
-				},
-				Response: &resolve.GraphQLResponse{
-					Data: &resolve.Object{
-						Fields: []*resolve.Field{
-							{
-								Name: []byte("subscriptionWithMultipleSubjects"),
-								Value: &resolve.Object{
-									Path:     []string{"subscriptionWithMultipleSubjects"},
-									Nullable: false,
-									PossibleTypes: map[string]struct{}{
-										"User": {},
-									},
-									TypeName: "User",
-									Fields: []*resolve.Field{
-										{
-											Name: []byte("id"),
-											Value: &resolve.Integer{
-												Path:     []string{"id"},
-												Nullable: false,
-											},
-										},
-									},
-								},
-							},
-						},
-					},
-				},
-			},
-		}
-		datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
-	})
-
-	t.Run("subscription with only static values", func(t *testing.T) {
-		const operation = "subscription SubscriptionWithStaticValues { subscriptionWithStaticValues { id } }"
-		const operationName = `SubscriptionWithStaticValues`
-		expect := &plan.SubscriptionResponsePlan{
-			Response: &resolve.GraphQLSubscription{
-				Trigger: resolve.GraphQLSubscriptionTrigger{
-					Input: []byte(`{"providerId":"default","subjects":["tenants.1.users.1"]}`),
-					Source: &NatsSubscriptionSource{
-						pubSub: &testPubsub{},
-					},
-					PostProcessing: resolve.PostProcessingConfiguration{
-						MergePath: []string{"subscriptionWithStaticValues"},
-					},
-				},
-				Response: &resolve.GraphQLResponse{
-					Data: &resolve.Object{
-						Fields: []*resolve.Field{
-							{
-								Name: []byte("subscriptionWithStaticValues"),
-								Value: &resolve.Object{
-									Path:     []string{"subscriptionWithStaticValues"},
-									Nullable: false,
-									PossibleTypes: map[string]struct{}{
-										"User": {},
-									},
-									TypeName: "User",
-									Fields: []*resolve.Field{
-										{
-											Name: []byte("id"),
-											Value: &resolve.Integer{
-												Path:     []string{"id"},
-												Nullable: false,
-											},
-										},
-									},
-								},
-							},
-						},
-					},
-				},
-			},
-		}
-		datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
-	})
-
-	t.Run("subscription with deeply nested argument and static value", func(t *testing.T) {
-		const operation = "subscription SubscriptionWithArgTemplateAndStaticValue { subscriptionWithArgTemplateAndStaticValue(nestedUserKey: { user: { id: 44, tenantId: 2 } }) { id } }"
-		const operationName = `SubscriptionWithArgTemplateAndStaticValue`
-		expect := &plan.SubscriptionResponsePlan{
-			Response: &resolve.GraphQLSubscription{
-				Trigger: resolve.GraphQLSubscriptionTrigger{
-					Input: []byte(`{"providerId":"default","subjects":["tenants.1.users.$$0$$"]}`),
-					Variables: resolve.Variables{
-						&resolve.ContextVariable{
-							Path:     []string{"a", "user", "id"},
-							Renderer: resolve.NewPlainVariableRenderer(),
-						},
-					},
-					Source: &NatsSubscriptionSource{
-						pubSub: &testPubsub{},
-					},
-					PostProcessing: resolve.PostProcessingConfiguration{
-						MergePath: []string{"subscriptionWithArgTemplateAndStaticValue"},
-					},
-				},
-				Response: &resolve.GraphQLResponse{
-					Data: &resolve.Object{
-						Fields: []*resolve.Field{
-							{
-								Name: []byte("subscriptionWithArgTemplateAndStaticValue"),
-								Value: &resolve.Object{
-									Path:     []string{"subscriptionWithArgTemplateAndStaticValue"},
-									Nullable: false,
-									PossibleTypes: map[string]struct{}{
-										"User": {},
-									},
-									TypeName: "User",
-									Fields: []*resolve.Field{
-										{
-											Name: []byte("id"),
-											Value: &resolve.Integer{
-												Path:     []string{"id"},
-												Nullable: false,
-											},
-										},
-									},
-								},
-							},
-						},
-					},
-				},
-			},
-		}
-		datasourcetesting.RunTest(schema, operation, operationName, expect, planConfig)(t)
-	})
-}
-
-func TestBuildEventDataBytes(t *testing.T) {
-	t.Run("check string serialization", func(t *testing.T) {
-		const operation = "mutation HelloMutation($id: ID!) { helloMutation(userKey:{id:$id,tenantId:3}) { success } }"
-		op := unsafeparser.ParseGraphqlDocumentString(operation)
-		var vars resolve.Variables
-		visitor := plan.Visitor{
-			Operation: &op,
-		}
-		_, err := buildEventDataBytes(1, &visitor, &vars)
-		require.NoError(t, err)
-		require.Len(t, vars, 1)
-
-		template := resolve.InputTemplate{
-			Segments: []resolve.TemplateSegment{
-				vars[0].TemplateSegment(),
-			},
-		}
-		ctx := &resolve.Context{
-			Variables: astjson.MustParseBytes([]byte(`{"id":"asdf"}`)),
-		}
-		buf := &bytes.Buffer{}
-		err = template.Render(ctx, nil, buf)
-		require.NoError(t, err)
-		require.Equal(t, `"asdf"`, buf.String())
-	})
-
-	t.Run("check int serialization", func(t *testing.T) {
-		const operation = "mutation HelloMutation($id: Int!) { helloMutation(userKey:{id:$id,tenantId:3}) { success } }"
-		op := unsafeparser.ParseGraphqlDocumentString(operation)
-		var vars resolve.Variables
-		visitor := plan.Visitor{
-			Operation: &op,
-		}
-		_, err := buildEventDataBytes(1, &visitor, &vars)
-		require.NoError(t, err)
-		require.Len(t, vars, 1)
-
-		template := resolve.InputTemplate{
-			Segments: []resolve.TemplateSegment{
-				vars[0].TemplateSegment(),
-			},
-		}
-		ctx := &resolve.Context{
-			Variables: astjson.MustParseBytes([]byte(`{"id":5}`)),
-		}
-		buf := &bytes.Buffer{}
-		err = template.Render(ctx, nil, buf)
-		require.NoError(t, err)
-		require.Equal(t, `5`, buf.String())
-	})
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_kafka.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_kafka.go
deleted file mode 100644
index cc562b8..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_kafka.go
+++ /dev/null
@@ -1,88 +0,0 @@
-package pubsub_datasource
-
-import (
-	"bytes"
-	"context"
-	"encoding/json"
-	"io"
-
-	"github.com/buger/jsonparser"
-	"github.com/cespare/xxhash/v2"
-
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
-)
-
-type KafkaEventConfiguration struct {
-	Topics []string `json:"topics"`
-}
-
-type KafkaConnector interface {
-	New(ctx context.Context) KafkaPubSub
-}
-
-// KafkaPubSub describe the interface that implements the primitive operations for pubsub
-type KafkaPubSub interface {
-	// Subscribe starts listening on the given subjects and sends the received messages to the given next channel
-	Subscribe(ctx context.Context, config KafkaSubscriptionEventConfiguration, updater resolve.SubscriptionUpdater) error
-	// Publish sends the given data to the given subject
-	Publish(ctx context.Context, config KafkaPublishEventConfiguration) error
-}
-
-type KafkaSubscriptionSource struct {
-	pubSub KafkaPubSub
-}
-
-func (s *KafkaSubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte, xxh *xxhash.Digest) error {
-
-	val, _, _, err := jsonparser.Get(input, "topics")
-	if err != nil {
-		return err
-	}
-
-	_, err = xxh.Write(val)
-	if err != nil {
-		return err
-	}
-
-	val, _, _, err = jsonparser.Get(input, "providerId")
-	if err != nil {
-		return err
-	}
-
-	_, err = xxh.Write(val)
-	return err
-}
-
-func (s *KafkaSubscriptionSource) Start(ctx *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error {
-	var subscriptionConfiguration KafkaSubscriptionEventConfiguration
-	err := json.Unmarshal(input, &subscriptionConfiguration)
-	if err != nil {
-		return err
-	}
-
-	return s.pubSub.Subscribe(ctx.Context(), subscriptionConfiguration, updater)
-}
-
-type KafkaPublishDataSource struct {
-	pubSub KafkaPubSub
-}
-
-func (s *KafkaPublishDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error {
-	var publishConfiguration KafkaPublishEventConfiguration
-	err := json.Unmarshal(input, &publishConfiguration)
-	if err != nil {
-		return err
-	}
-
-	if err := s.pubSub.Publish(ctx, publishConfiguration); err != nil {
-		_, err = io.WriteString(out, `{"success": false}`)
-		return err
-	}
-	_, err = io.WriteString(out, `{"success": true}`)
-	return err
-}
-
-func (s *KafkaPublishDataSource) LoadWithFiles(ctx context.Context, input []byte, files []*httpclient.FileUpload, out *bytes.Buffer) (err error) {
-	panic("not implemented")
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_nats.go b/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_nats.go
deleted file mode 100644
index 31cb6d4..0000000
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/datasource/pubsub_datasource/pubsub_nats.go
+++ /dev/null
@@ -1,116 +0,0 @@
-package pubsub_datasource
-
-import (
-	"bytes"
-	"context"
-	"encoding/json"
-	"io"
-
-	"github.com/buger/jsonparser"
-	"github.com/cespare/xxhash/v2"
-
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/datasource/httpclient"
-	"github.com/wundergraph/graphql-go-tools/v2/pkg/engine/resolve"
-)
-
-type NatsStreamConfiguration struct {
-	Consumer                  string `json:"consumer"`
-	ConsumerInactiveThreshold int32  `json:"consumerInactiveThreshold"`
-	StreamName                string `json:"streamName"`
-}
-
-type NatsEventConfiguration struct {
-	StreamConfiguration *NatsStreamConfiguration `json:"streamConfiguration,omitempty"`
-	Subjects            []string                 `json:"subjects"`
-}
-
-type NatsConnector interface {
-	New(ctx context.Context) NatsPubSub
-}
-
-// NatsPubSub describe the interface that implements the primitive operations for pubsub
-type NatsPubSub interface {
-	// Subscribe starts listening on the given subjects and sends the received messages to the given next channel
-	Subscribe(ctx context.Context, event NatsSubscriptionEventConfiguration, updater resolve.SubscriptionUpdater) error
-	// Publish sends the given data to the given subject
-	Publish(ctx context.Context, event NatsPublishAndRequestEventConfiguration) error
-	// Request sends a request on the given subject and writes the response to the given writer
-	Request(ctx context.Context, event NatsPublishAndRequestEventConfiguration, w io.Writer) error
-}
-
-type NatsSubscriptionSource struct {
-	pubSub NatsPubSub
-}
-
-func (s *NatsSubscriptionSource) UniqueRequestID(ctx *resolve.Context, input []byte, xxh *xxhash.Digest) error {
-
-	val, _, _, err := jsonparser.Get(input, "subjects")
-	if err != nil {
-		return err
-	}
-
-	_, err = xxh.Write(val)
-	if err != nil {
-		return err
-	}
-
-	val, _, _, err = jsonparser.Get(input, "providerId")
-	if err != nil {
-		return err
-	}
-
-	_, err = xxh.Write(val)
-	return err
-}
-
-func (s *NatsSubscriptionSource) Start(ctx *resolve.Context, input []byte, updater resolve.SubscriptionUpdater) error {
-	var subscriptionConfiguration NatsSubscriptionEventConfiguration
-	err := json.Unmarshal(input, &subscriptionConfiguration)
-	if err != nil {
-		return err
-	}
-
-	return s.pubSub.Subscribe(ctx.Context(), subscriptionConfiguration, updater)
-}
-
-type NatsPublishDataSource struct {
-	pubSub NatsPubSub
-}
-
-func (s *NatsPublishDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error {
-	var publishConfiguration NatsPublishAndRequestEventConfiguration
-	err := json.Unmarshal(input, &publishConfiguration)
-	if err != nil {
-		return err
-	}
-
-	if err := s.pubSub.Publish(ctx, publishConfiguration); err != nil {
-		_, err = io.WriteString(out, `{"success": false}`)
-		return err
-	}
-
-	_, err = io.WriteString(out, `{"success": true}`)
-	return err
-}
-
-func (s *NatsPublishDataSource) LoadWithFiles(ctx context.Context, input []byte, files []*httpclient.FileUpload, out *bytes.Buffer) error {
-	panic("not implemented")
-}
-
-type NatsRequestDataSource struct {
-	pubSub NatsPubSub
-}
-
-func (s *NatsRequestDataSource) Load(ctx context.Context, input []byte, out *bytes.Buffer) error {
-	var subscriptionConfiguration NatsPublishAndRequestEventConfiguration
-	err := json.Unmarshal(input, &subscriptionConfiguration)
-	if err != nil {
-		return err
-	}
-
-	return s.pubSub.Request(ctx, subscriptionConfiguration, out)
-}
-
-func (s *NatsRequestDataSource) LoadWithFiles(ctx context.Context, input []byte, files []*httpclient.FileUpload, out *bytes.Buffer) error {
-	panic("not implemented")
-}
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/resolve/datasource.go b/app/v2/pkg/engine/resolve/datasource.go
index c679d76..e6a6cc8 100644
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/resolve/datasource.go
+++ b/app/v2/pkg/engine/resolve/datasource.go
@@ -21,6 +21,8 @@ type SubscriptionDataSource interface {
 	UniqueRequestID(ctx *Context, input []byte, xxh *xxhash.Digest) (err error)
 }
 
+type SubscriptionStartHook func(ctx *Context, id SubscriptionIdentifier, input []byte, updater SubscriptionUpdater) error
+
 type AsyncSubscriptionDataSource interface {
 	AsyncStart(ctx *Context, id uint64, input []byte, updater SubscriptionUpdater) error
 	AsyncStop(id uint64)
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/resolve/resolve.go b/app/v2/pkg/engine/resolve/resolve.go
index 14d8ad4..f174299 100644
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/resolve/resolve.go
+++ b/app/v2/pkg/engine/resolve/resolve.go
@@ -293,8 +293,9 @@ func (r *Resolver) ResolveGraphQLResponse(ctx *Context, response *GraphQLRespons
 
 type trigger struct {
 	id            uint64
+	ctx           context.Context
 	cancel        context.CancelFunc
-	subscriptions map[*Context]*sub
+	subscriptions map[SubscriptionIdentifier]*sub
 	// initialized is set to true when the trigger is started and initialized
 	initialized bool
 }
@@ -509,12 +510,16 @@ func (r *Resolver) handleEvent(event subscriptionEvent) {
 		r.handleRemoveClient(event.id.ConnectionID)
 	case subscriptionEventKindTriggerUpdate:
 		r.handleTriggerUpdate(event.triggerID, event.data)
+	case subscriptionEventKindTriggerSubscriptionUpdate:
+		r.handleTriggerSubscriptionUpdate(event.triggerID, event.id, event.data)
 	case subscriptionEventKindTriggerComplete:
 		r.handleTriggerComplete(event.triggerID)
 	case subscriptionEventKindTriggerInitialized:
 		r.handleTriggerInitialized(event.triggerID)
 	case subscriptionEventKindTriggerClose:
 		r.handleTriggerClose(event)
+	case subscriptionEventKindTriggerSubscriptionClose:
+		r.handleTriggerSubscriptionClose(event.triggerID, event.id, event.closeKind)
 	case subscriptionEventKindUnknown:
 		panic("unknown event")
 	}
@@ -582,9 +587,7 @@ func (r *Resolver) handleTriggerComplete(triggerID uint64) {
 }
 
 func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription) {
-	var (
-		err error
-	)
+	var err error
 	if r.options.Debug {
 		fmt.Printf("resolver:trigger:subscription:add:%d:%d\n", triggerID, add.id.SubscriptionID)
 	}
@@ -602,82 +605,124 @@ func (r *Resolver) handleAddSubscription(triggerID uint64, add *addSubscription)
 		s.heartbeat = true
 	}
 
-	// Start the dedicated worker goroutine where the subscription updates are processed
-	// and writes are written to the client in a single threaded manner
-	go s.startWorker()
-
 	trig, ok := r.triggers[triggerID]
-	if ok {
-		trig.subscriptions[add.ctx] = s
-		if r.reporter != nil {
-			r.reporter.SubscriptionCountInc(1)
-		}
-		if r.options.Debug {
-			fmt.Printf("resolver:trigger:subscription:added:%d:%d\n", triggerID, add.id.SubscriptionID)
-		}
-		return
-	}
-
+	createdTrigger := false
 	if r.options.Debug {
 		fmt.Printf("resolver:create:trigger:%d\n", triggerID)
 	}
-	ctx, cancel := context.WithCancel(xcontext.Detach(add.ctx.Context()))
-	updater := &subscriptionUpdater{
-		debug:     r.options.Debug,
-		triggerID: triggerID,
-		ch:        r.events,
-		ctx:       ctx,
-	}
-	cloneCtx := add.ctx.clone(ctx)
-	trig = &trigger{
-		id:            triggerID,
-		subscriptions: make(map[*Context]*sub),
-		cancel:        cancel,
+	var (
+		triggerCtx      context.Context
+		cancel          context.CancelFunc
+		updater         *subscriptionUpdater
+		cloneCtx        *Context
+		asyncDataSource AsyncSubscriptionDataSource
+	)
+	if !ok {
+		createdTrigger = true
+		triggerCtx, cancel = context.WithCancel(xcontext.Detach(add.ctx.Context()))
+		updater = &subscriptionUpdater{
+			debug:     r.options.Debug,
+			triggerID: triggerID,
+			ch:        r.events,
+			ctx:       triggerCtx,
+		}
+		cloneCtx = add.ctx.clone(triggerCtx)
+		trig = &trigger{
+			id:            triggerID,
+			ctx:           triggerCtx,
+			subscriptions: make(map[SubscriptionIdentifier]*sub),
+			cancel:        cancel,
+		}
+		r.triggers[triggerID] = trig
+
+		if async, ok := add.resolve.Trigger.Source.(AsyncSubscriptionDataSource); ok {
+			trig.cancel = func() {
+				cancel()
+				async.AsyncStop(triggerID)
+			}
+			asyncDataSource = async
+		}
 	}
-	r.triggers[triggerID] = trig
-	trig.subscriptions[add.ctx] = s
+	trig.subscriptions[add.id] = s
 
 	if r.reporter != nil {
 		r.reporter.SubscriptionCountInc(1)
 	}
+	if r.options.Debug {
+		fmt.Printf("resolver:trigger:subscription:added:%d:%d\n", triggerID, add.id.SubscriptionID)
+	}
 
-	var asyncDataSource AsyncSubscriptionDataSource
+	// Start the dedicated worker goroutine where the subscription updates are processed
+	// and writes are written to the client in a single threaded manner
+	go s.startWorker()
 
-	if async, ok := add.resolve.Trigger.Source.(AsyncSubscriptionDataSource); ok {
-		trig.cancel = func() {
-			cancel()
-			async.AsyncStop(triggerID)
+	if err := r.runSubscriptionStartHooks(add, triggerID, trig.ctx); err != nil {
+		r.asyncErrorWriter.WriteError(add.ctx, err, add.resolve.Response, add.writer)
+		removed := r.closeTriggerSubscriptions(triggerID, SubscriptionCloseKindNormal, func(a SubscriptionIdentifier) bool {
+			return a == add.id
+		})
+		if r.reporter != nil {
+			r.reporter.SubscriptionCountDec(removed)
 		}
-		asyncDataSource = async
+		if createdTrigger && len(trig.subscriptions) == 0 {
+			trig.cancel()
+			delete(r.triggers, triggerID)
+		}
+		return
 	}
 
-	go func() {
-		if r.options.Debug {
-			fmt.Printf("resolver:trigger:start:%d\n", triggerID)
-		}
-		if asyncDataSource != nil {
-			err = asyncDataSource.AsyncStart(cloneCtx, triggerID, add.input, updater)
-		} else {
-			err = add.resolve.Trigger.Source.Start(cloneCtx, add.input, updater)
-		}
-		if err != nil {
+	if createdTrigger {
+		go func() {
 			if r.options.Debug {
-				fmt.Printf("resolver:trigger:failed:%d\n", triggerID)
+				fmt.Printf("resolver:trigger:start:%d\n", triggerID)
+			}
+			if asyncDataSource != nil {
+				err = asyncDataSource.AsyncStart(cloneCtx, triggerID, add.input, updater)
+			} else {
+				err = add.resolve.Trigger.Source.Start(cloneCtx, add.input, updater)
+			}
+			if err != nil {
+				if r.options.Debug {
+					fmt.Printf("resolver:trigger:failed:%d\n", triggerID)
+				}
+				r.asyncErrorWriter.WriteError(add.ctx, err, add.resolve.Response, add.writer)
+				_ = r.emitTriggerClose(triggerID)
+				return
 			}
-			r.asyncErrorWriter.WriteError(add.ctx, err, add.resolve.Response, add.writer)
-			_ = r.emitTriggerClose(triggerID)
-			return
-		}
 
-		_ = r.emitTriggerInitialized(triggerID)
+			_ = r.emitTriggerInitialized(triggerID)
 
-		if r.options.Debug {
-			fmt.Printf("resolver:trigger:started:%d\n", triggerID)
-		}
-	}()
+			if r.options.Debug {
+				fmt.Printf("resolver:trigger:started:%d\n", triggerID)
+			}
+		}()
+	}
 
 }
 
+func (r *Resolver) runSubscriptionStartHooks(add *addSubscription, triggerID uint64, triggerCtx context.Context) error {
+	if len(add.resolve.Trigger.StartHooks) == 0 {
+		return nil
+	}
+
+	updater := &subscriptionUpdater{
+		debug:     r.options.Debug,
+		triggerID: triggerID,
+		ch:        r.events,
+		ctx:       triggerCtx,
+	}
+
+	for _, hook := range add.resolve.Trigger.StartHooks {
+		if hook == nil {
+			continue
+		}
+		if err := hook(add.ctx, add.id, add.input, updater); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
 func (r *Resolver) emitTriggerClose(triggerID uint64) error {
 	if r.options.Debug {
 		fmt.Printf("resolver:trigger:shutdown:%d\n", triggerID)
@@ -778,14 +823,14 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) {
 		fmt.Printf("resolver:trigger:update:%d\n", id)
 	}
 
-	for c, s := range trig.subscriptions {
-		c, s := c, s
-		if err := c.ctx.Err(); err != nil {
+	for _, s := range trig.subscriptions {
+		s := s
+		if err := s.ctx.ctx.Err(); err != nil {
 			continue // no need to schedule an event update when the client already disconnected
 		}
-		skip, err := s.resolve.Filter.SkipEvent(c, data, r.triggerUpdateBuf)
+		skip, err := s.resolve.Filter.SkipEvent(s.ctx, data, r.triggerUpdateBuf)
 		if err != nil {
-			r.asyncErrorWriter.WriteError(c, err, s.resolve.Response, s.writer)
+			r.asyncErrorWriter.WriteError(s.ctx, err, s.resolve.Response, s.writer)
 			continue
 		}
 		if skip {
@@ -793,14 +838,14 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) {
 		}
 
 		fn := func() {
-			r.executeSubscriptionUpdate(c, s, data)
+			r.executeSubscriptionUpdate(s.ctx, s, data)
 		}
 
 		select {
 		case <-r.ctx.Done():
 			// Skip sending all events if the resolver is shutting down
 			return
-		case <-c.ctx.Done():
+		case <-s.ctx.ctx.Done():
 			// Skip sending the event if the client disconnected
 		case s.workChan <- workItem{fn, false}:
 			// Send the event to the subscription worker
@@ -808,6 +853,57 @@ func (r *Resolver) handleTriggerUpdate(id uint64, data []byte) {
 	}
 }
 
+func (r *Resolver) handleTriggerSubscriptionUpdate(triggerID uint64, id SubscriptionIdentifier, data []byte) {
+	trig, ok := r.triggers[triggerID]
+	if !ok {
+		return
+	}
+	s, ok := trig.subscriptions[id]
+	if !ok {
+		return
+	}
+	if err := s.ctx.ctx.Err(); err != nil {
+		return
+	}
+
+	skip, err := s.resolve.Filter.SkipEvent(s.ctx, data, r.triggerUpdateBuf)
+	if err != nil {
+		r.asyncErrorWriter.WriteError(s.ctx, err, s.resolve.Response, s.writer)
+		return
+	}
+	if skip {
+		return
+	}
+
+	fn := func() {
+		r.executeSubscriptionUpdate(s.ctx, s, data)
+	}
+
+	select {
+	case <-r.ctx.Done():
+		return
+	case <-s.ctx.ctx.Done():
+	case s.workChan <- workItem{fn, false}:
+	}
+}
+
+func (r *Resolver) handleTriggerSubscriptionClose(triggerID uint64, id SubscriptionIdentifier, closeKind SubscriptionCloseKind) {
+	trig, ok := r.triggers[triggerID]
+	if !ok {
+		return
+	}
+
+	removed := r.closeTriggerSubscriptions(triggerID, closeKind, func(sID SubscriptionIdentifier) bool {
+		return sID == id
+	})
+	if r.reporter != nil {
+		r.reporter.SubscriptionCountDec(removed)
+	}
+	if len(trig.subscriptions) == 0 {
+		r.closeTrigger(trig.id, closeKind)
+	}
+}
+
 func (r *Resolver) closeTrigger(id uint64, kind SubscriptionCloseKind) {
 	if r.options.Debug {
 		fmt.Printf("resolver:trigger:close:%d\n", id)
@@ -863,7 +959,7 @@ func (r *Resolver) completeTriggerSubscriptions(id uint64, completeMatcher func(
 		return 0
 	}
 	removed := 0
-	for c, s := range trig.subscriptions {
+	for subID, s := range trig.subscriptions {
 		if completeMatcher != nil && !completeMatcher(s.id) {
 			continue
 		}
@@ -877,7 +973,7 @@ func (r *Resolver) completeTriggerSubscriptions(id uint64, completeMatcher func(
 
 		// Important because we remove the subscription from the trigger on the same goroutine
 		// as we send work to the subscription worker. We can ensure that no new work is sent to the worker after this point.
-		delete(trig.subscriptions, c)
+		delete(trig.subscriptions, subID)
 
 		if r.options.Debug {
 			fmt.Printf("resolver:trigger:subscription:closed:%d:%d\n", trig.id, s.id.SubscriptionID)
@@ -894,7 +990,7 @@ func (r *Resolver) closeTriggerSubscriptions(id uint64, closeKind SubscriptionCl
 		return 0
 	}
 	removed := 0
-	for c, s := range trig.subscriptions {
+	for subID, s := range trig.subscriptions {
 		if closeMatcher != nil && !closeMatcher(s.id) {
 			continue
 		}
@@ -908,7 +1004,7 @@ func (r *Resolver) closeTriggerSubscriptions(id uint64, closeKind SubscriptionCl
 
 		// Important because we remove the subscription from the trigger on the same goroutine
 		// as we send work to the subscription worker. We can ensure that no new work is sent to the worker after this point.
-		delete(trig.subscriptions, c)
+		delete(trig.subscriptions, subID)
 
 		if r.options.Debug {
 			fmt.Printf("resolver:trigger:subscription:closed:%d:%d\n", trig.id, s.id.SubscriptionID)
@@ -1207,21 +1303,44 @@ type subscriptionUpdater struct {
 	ctx       context.Context
 }
 
+func (s *subscriptionUpdater) emit(event subscriptionEvent) {
+	select {
+	case <-s.ctx.Done():
+		return
+	case s.ch <- event:
+	default:
+		// Avoid deadlocks when called from the resolver event loop goroutine.
+		go func() {
+			select {
+			case <-s.ctx.Done():
+			case s.ch <- event:
+			}
+		}()
+	}
+}
+
 func (s *subscriptionUpdater) Update(data []byte) {
 	if s.debug {
 		fmt.Printf("resolver:subscription_updater:update:%d\n", s.triggerID)
 	}
 
-	select {
-	case <-s.ctx.Done():
-		// Skip sending events if trigger is already done
-		return
-	case s.ch <- subscriptionEvent{
+	s.emit(subscriptionEvent{
 		triggerID: s.triggerID,
 		kind:      subscriptionEventKindTriggerUpdate,
 		data:      data,
-	}:
+	})
+}
+
+func (s *subscriptionUpdater) UpdateSubscription(id SubscriptionIdentifier, data []byte) {
+	if s.debug {
+		fmt.Printf("resolver:subscription_updater:update:subscription:%d:%d\n", s.triggerID, id.SubscriptionID)
 	}
+	s.emit(subscriptionEvent{
+		triggerID: s.triggerID,
+		id:        id,
+		kind:      subscriptionEventKindTriggerSubscriptionUpdate,
+		data:      data,
+	})
 }
 
 func (s *subscriptionUpdater) Complete() {
@@ -1229,20 +1348,19 @@ func (s *subscriptionUpdater) Complete() {
 		fmt.Printf("resolver:subscription_updater:complete:%d\n", s.triggerID)
 	}
 
-	select {
-	case <-s.ctx.Done():
+	if s.ctx.Err() != nil {
 		// Skip sending events if trigger is already done
 		if s.debug {
 			fmt.Printf("resolver:subscription_updater:complete:skip:%d\n", s.triggerID)
 		}
 		return
-	case s.ch <- subscriptionEvent{
+	}
+	s.emit(subscriptionEvent{
 		triggerID: s.triggerID,
 		kind:      subscriptionEventKindTriggerComplete,
-	}:
-		if s.debug {
-			fmt.Printf("resolver:subscription_updater:complete:sent_event:%d\n", s.triggerID)
-		}
+	})
+	if s.debug {
+		fmt.Printf("resolver:subscription_updater:complete:sent_event:%d\n", s.triggerID)
 	}
 }
 
@@ -1251,24 +1369,35 @@ func (s *subscriptionUpdater) Close(kind SubscriptionCloseKind) {
 		fmt.Printf("resolver:subscription_updater:close:%d\n", s.triggerID)
 	}
 
-	select {
-	case <-s.ctx.Done():
+	if s.ctx.Err() != nil {
 		// Skip sending events if trigger is already done
 		if s.debug {
 			fmt.Printf("resolver:subscription_updater:close:skip:%d\n", s.triggerID)
 		}
 		return
-	case s.ch <- subscriptionEvent{
+	}
+	s.emit(subscriptionEvent{
 		triggerID: s.triggerID,
 		kind:      subscriptionEventKindTriggerClose,
 		closeKind: kind,
-	}:
-		if s.debug {
-			fmt.Printf("resolver:subscription_updater:close:sent_event:%d\n", s.triggerID)
-		}
+	})
+	if s.debug {
+		fmt.Printf("resolver:subscription_updater:close:sent_event:%d\n", s.triggerID)
 	}
 }
 
+func (s *subscriptionUpdater) CloseSubscription(id SubscriptionIdentifier, kind SubscriptionCloseKind) {
+	if s.debug {
+		fmt.Printf("resolver:subscription_updater:close:subscription:%d:%d\n", s.triggerID, id.SubscriptionID)
+	}
+	s.emit(subscriptionEvent{
+		triggerID: s.triggerID,
+		id:        id,
+		kind:      subscriptionEventKindTriggerSubscriptionClose,
+		closeKind: kind,
+	})
+}
+
 type subscriptionEvent struct {
 	triggerID       uint64
 	id              SubscriptionIdentifier
@@ -1292,6 +1421,7 @@ type subscriptionEventKind int
 const (
 	subscriptionEventKindUnknown subscriptionEventKind = iota
 	subscriptionEventKindTriggerUpdate
+	subscriptionEventKindTriggerSubscriptionUpdate
 	subscriptionEventKindTriggerComplete
 	subscriptionEventKindAddSubscription
 	subscriptionEventKindRemoveSubscription
@@ -1299,13 +1429,18 @@ const (
 	subscriptionEventKindRemoveClient
 	subscriptionEventKindTriggerInitialized
 	subscriptionEventKindTriggerClose
+	subscriptionEventKindTriggerSubscriptionClose
 )
 
 type SubscriptionUpdater interface {
 	// Update sends an update to the client. It is not guaranteed that the update is sent immediately.
 	Update(data []byte)
+	// UpdateSubscription sends an update to a specific subscription on the same trigger.
+	UpdateSubscription(id SubscriptionIdentifier, data []byte)
 	// Complete also takes care of cleaning up the trigger and all subscriptions. No more updates should be sent after calling Complete.
 	Complete()
 	// Close closes the subscription and cleans up the trigger and all subscriptions. No more updates should be sent after calling Close.
 	Close(kind SubscriptionCloseKind)
+	// CloseSubscription closes a specific subscription and keeps sibling subscriptions active.
+	CloseSubscription(id SubscriptionIdentifier, kind SubscriptionCloseKind)
 }
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/resolve/resolve_test.go b/app/v2/pkg/engine/resolve/resolve_test.go
index 8e15ff9..e071e5d 100644
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/resolve/resolve_test.go
+++ b/app/v2/pkg/engine/resolve/resolve_test.go
@@ -4813,6 +4813,42 @@ type _fakeStream struct {
 	isDone      atomic.Bool
 }
 
+type _controllableSubscriptionSource struct {
+	updaterCh  chan SubscriptionUpdater
+	startCalls atomic.Int32
+}
+
+func newControllableSubscriptionSource() *_controllableSubscriptionSource {
+	return &_controllableSubscriptionSource{
+		updaterCh: make(chan SubscriptionUpdater, 1),
+	}
+}
+
+func (s *_controllableSubscriptionSource) AwaitUpdater(t *testing.T, timeout time.Duration) SubscriptionUpdater {
+	t.Helper()
+	select {
+	case updater := <-s.updaterCh:
+		return updater
+	case <-time.After(timeout):
+		t.Fatal("timed out waiting for updater")
+		return nil
+	}
+}
+
+func (s *_controllableSubscriptionSource) UniqueRequestID(ctx *Context, input []byte, xxh *xxhash.Digest) (err error) {
+	_, err = xxh.Write(input)
+	return err
+}
+
+func (s *_controllableSubscriptionSource) Start(ctx *Context, input []byte, updater SubscriptionUpdater) error {
+	s.startCalls.Add(1)
+	select {
+	case s.updaterCh <- updater:
+	default:
+	}
+	return nil
+}
+
 func (f *_fakeStream) AwaitIsDone(t *testing.T, timeout time.Duration) {
 	t.Helper()
 	deadline := time.Now().Add(timeout)
@@ -5134,6 +5170,104 @@ func TestResolver_ResolveGraphQLSubscription(t *testing.T) {
 		assert.Contains(t, messages, `{"data":{"counter":2}}`)
 	})
 
+	t.Run("should execute start hooks before stream updates", func(t *testing.T) {
+		c, cancel := context.WithCancel(context.Background())
+		defer cancel()
+
+		fakeStream := createFakeStream(func(counter int) (message string, done bool) {
+			return fmt.Sprintf(`{"data":{"counter":%d}}`, counter), counter == 0
+		}, 20*time.Millisecond, nil)
+
+		resolver, plan, recorder, id := setup(c, fakeStream)
+		plan.Trigger.StartHooks = []SubscriptionStartHook{
+			func(ctx *Context, id SubscriptionIdentifier, input []byte, updater SubscriptionUpdater) error {
+				updater.UpdateSubscription(id, []byte(`{"data":{"counter":100}}`))
+				return nil
+			},
+		}
+
+		ctx := &Context{
+			ctx: context.Background(),
+		}
+
+		err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
+		assert.NoError(t, err)
+
+		recorder.AwaitComplete(t, defaultTimeout)
+		messages := recorder.Messages()
+		assert.GreaterOrEqual(t, len(messages), 2)
+		assert.Equal(t, `{"data":{"counter":100}}`, messages[0])
+		assert.Equal(t, `{"data":{"counter":0}}`, messages[1])
+	})
+
+	t.Run("should not start trigger when start hook fails", func(t *testing.T) {
+		c, cancel := context.WithCancel(context.Background())
+		defer cancel()
+
+		source := newControllableSubscriptionSource()
+		resolver, plan, recorder, id := setup(c, source)
+		plan.Trigger.StartHooks = []SubscriptionStartHook{
+			func(ctx *Context, id SubscriptionIdentifier, input []byte, updater SubscriptionUpdater) error {
+				return errors.New("hook failed")
+			},
+		}
+
+		ctx := &Context{
+			ctx: context.Background(),
+		}
+
+		err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder, id)
+		assert.NoError(t, err)
+		recorder.AwaitClosed(t, defaultTimeout)
+		recorder.AwaitAnyMessageCount(t, defaultTimeout)
+		assert.Equal(t, int32(0), source.startCalls.Load())
+		assert.Contains(t, recorder.Messages()[0], `hook failed`)
+	})
+
+	t.Run("should support targeted subscription updates and close", func(t *testing.T) {
+		c, cancel := context.WithCancel(context.Background())
+		defer cancel()
+
+		source := newControllableSubscriptionSource()
+		resolver, plan, _, _ := setup(c, source)
+
+		id1 := SubscriptionIdentifier{ConnectionID: 1, SubscriptionID: 1}
+		id2 := SubscriptionIdentifier{ConnectionID: 2, SubscriptionID: 2}
+
+		recorder1 := &SubscriptionRecorder{
+			buf:      &bytes.Buffer{},
+			messages: []string{},
+			complete: atomic.Bool{},
+		}
+		recorder2 := &SubscriptionRecorder{
+			buf:      &bytes.Buffer{},
+			messages: []string{},
+			complete: atomic.Bool{},
+		}
+
+		ctx := &Context{ctx: context.Background()}
+
+		err := resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder1, id1)
+		assert.NoError(t, err)
+		err = resolver.AsyncResolveGraphQLSubscription(ctx, plan, recorder2, id2)
+		assert.NoError(t, err)
+		time.Sleep(50 * time.Millisecond)
+
+		updater := source.AwaitUpdater(t, defaultTimeout)
+		updater.UpdateSubscription(id1, []byte(`{"data":{"counter":1}}`))
+		updater.UpdateSubscription(id2, []byte(`{"data":{"counter":2}}`))
+		recorder1.AwaitMessages(t, 1, defaultTimeout)
+		recorder2.AwaitMessages(t, 1, defaultTimeout)
+		updater.CloseSubscription(id1, SubscriptionCloseKindNormal)
+		updater.CloseSubscription(id2, SubscriptionCloseKindNormal)
+
+		recorder1.AwaitClosed(t, defaultTimeout)
+		recorder2.AwaitClosed(t, defaultTimeout)
+		assert.Equal(t, int32(1), source.startCalls.Load())
+		assert.Equal(t, []string{`{"data":{"counter":1}}`}, recorder1.Messages())
+		assert.Equal(t, []string{`{"data":{"counter":2}}`}, recorder2.Messages())
+	})
+
 	t.Run("should successfully delete multiple finished subscriptions", func(t *testing.T) {
 		c, cancel := context.WithCancel(context.Background())
 		defer cancel()
diff --git a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/resolve/response.go b/app/v2/pkg/engine/resolve/response.go
index b98f4c0..b1cb405 100644
--- a/tmp/agent-patch-flux-pr-1309.1-of-1.2026-02-28__13-38-10__gpt-5-3-codex/app/v2/pkg/engine/resolve/response.go
+++ b/app/v2/pkg/engine/resolve/response.go
@@ -20,6 +20,7 @@ type GraphQLSubscriptionTrigger struct {
 	InputTemplate  InputTemplate
 	Variables      Variables
 	Source         SubscriptionDataSource
+	StartHooks     []SubscriptionStartHook
 	PostProcessing PostProcessingConfiguration
 	QueryPlan      *QueryPlan
 }