From 74ab5e5a62e12a067442d15649088c011b3cee9d Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Fri, 6 Mar 2026 08:53:04 +0100 Subject: [PATCH 1/8] feat: option to remove nats consumers on shutdown --- demo/pkg/subgraphs/subgraphs.go | 4 +- router-tests/testenv/testenv.go | 2 +- router/pkg/config/config.go | 7 +-- router/pkg/config/config.schema.json | 5 ++ router/pkg/config/testdata/config_full.json | 6 ++- router/pkg/pubsub/nats/adapter.go | 58 +++++++++++++++++---- router/pkg/pubsub/nats/provider_builder.go | 2 +- 7 files changed, 64 insertions(+), 20 deletions(-) diff --git a/demo/pkg/subgraphs/subgraphs.go b/demo/pkg/subgraphs/subgraphs.go index 3b5d294fca..44764f14bd 100644 --- a/demo/pkg/subgraphs/subgraphs.go +++ b/demo/pkg/subgraphs/subgraphs.go @@ -212,7 +212,7 @@ func New(ctx context.Context, config *Config) (*Subgraphs, error) { natsPubSubByProviderID := map[string]natsPubsub.Adapter{} - defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{ + defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", false, datasource.ProviderOpts{ StreamMetricStore: rmetric.NewNoopStreamMetricStore(), }) if err != nil { @@ -223,7 +223,7 @@ func New(ctx context.Context, config *Config) (*Subgraphs, error) { } natsPubSubByProviderID["default"] = defaultAdapter - myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{ + myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", false, datasource.ProviderOpts{ StreamMetricStore: rmetric.NewNoopStreamMetricStore(), }) if err != nil { diff --git a/router-tests/testenv/testenv.go b/router-tests/testenv/testenv.go index cea58ce43c..faeea30212 100644 --- a/router-tests/testenv/testenv.go +++ b/router-tests/testenv/testenv.go @@ -2911,7 +2911,7 @@ func subgraphOptions(ctx context.Context, t testing.TB, logger *zap.Logger, nats } natsPubSubByProviderID := make(map[string]pubsubNats.Adapter, len(DemoNatsProviders)) for _, sourceName := range DemoNatsProviders { - adapter, err := pubsubNats.NewAdapter(ctx, logger, natsData.Params[0].Url, natsData.Params[0].Opts, "hostname", "listenaddr", datasource.ProviderOpts{ + adapter, err := pubsubNats.NewAdapter(ctx, logger, natsData.Params[0].Url, natsData.Params[0].Opts, "hostname", "listenaddr", false, datasource.ProviderOpts{ StreamMetricStore: rmetric.NewNoopStreamMetricStore(), }) require.NoError(t, err) diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 10881ddd25..dbe474b401 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -617,9 +617,10 @@ type NatsAuthentication struct { } type NatsEventSource struct { - ID string `yaml:"id,omitempty"` - URL string `yaml:"url,omitempty"` - Authentication *NatsAuthentication `yaml:"authentication,omitempty"` + ID string `yaml:"id,omitempty"` + URL string `yaml:"url,omitempty"` + Authentication *NatsAuthentication `yaml:"authentication,omitempty"` + DeleteDurableConsumersOnShutdown bool `yaml:"delete_durable_consumers_on_shutdown" envDefault:"false"` } func (n NatsEventSource) GetID() string { diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 2948ea0e46..e2fa1fb928 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -2580,6 +2580,11 @@ } } ] + }, + "delete_durable_consumers_on_shutdown": { + "type": "boolean", + "description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down. This prevents zombie consumers accumulating on the NATS server across Kubernetes rolling deployments where each pod has a unique hostname baked into the consumer name. Defaults to false.", + "default": false } } } diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index f97deeea5e..98c86135c7 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -656,7 +656,8 @@ { "ID": "default", "URL": "nats://localhost:4222", - "Authentication": null + "Authentication": null, + "DeleteDurableConsumersOnShutdown": false }, { "ID": "my-nats", @@ -667,7 +668,8 @@ "Username": "admin" }, "Token": null - } + }, + "DeleteDurableConsumersOnShutdown": false } ], "Kafka": [ diff --git a/router/pkg/pubsub/nats/adapter.go b/router/pkg/pubsub/nats/adapter.go index 0c9c0c140f..1ad2cc4da1 100644 --- a/router/pkg/pubsub/nats/adapter.go +++ b/router/pkg/pubsub/nats/adapter.go @@ -32,6 +32,11 @@ type Adapter interface { // Ensure ProviderAdapter implements ProviderSubscriptionHooks var _ datasource.Adapter = (*ProviderAdapter)(nil) +type trackedConsumer struct { + streamName string + consumerName string +} + // ProviderAdapter implements the AdapterInterface for NATS pub/sub type ProviderAdapter struct { ctx context.Context @@ -46,6 +51,10 @@ type ProviderAdapter struct { opts []nats.Option flushTimeout time.Duration streamMetricStore metric.StreamMetricStore + + deleteConsumersOnShutdown bool + trackedConsumersMu sync.Mutex + trackedConsumers []trackedConsumer } // getInstanceIdentifier returns an identifier for the current instance. @@ -116,6 +125,15 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, cfg datasource.Subscrip return datasource.NewError(fmt.Sprintf(`failed to create or update consumer for stream "%s"`, subConf.StreamConfiguration.StreamName), err) } + if p.deleteConsumersOnShutdown { + p.trackedConsumersMu.Lock() + p.trackedConsumers = append(p.trackedConsumers, trackedConsumer{ + streamName: subConf.StreamConfiguration.StreamName, + consumerName: durableConsumerName, + }) + p.trackedConsumersMu.Unlock() + } + p.closeWg.Add(1) go func() { @@ -383,6 +401,23 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error { var shutdownErr error + // Delete durable consumers before closing the connection — the JetStream + // API requires an open connection to reach the NATS server. + if p.deleteConsumersOnShutdown && p.js != nil { + p.trackedConsumersMu.Lock() + consumers := p.trackedConsumers + p.trackedConsumersMu.Unlock() + for _, tc := range consumers { + if err := p.js.DeleteConsumer(ctx, tc.streamName, tc.consumerName); err != nil { + p.logger.Warn("failed to delete durable consumer on shutdown", + zap.String("stream", tc.streamName), + zap.String("consumer", tc.consumerName), + zap.Error(err), + ) + } + } + } + fErr := p.flush(ctx) if fErr != nil { shutdownErr = errors.Join(shutdownErr, fErr) @@ -407,7 +442,7 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error { return nil } -func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats.Option, hostName string, routerListenAddr string, providerOpts datasource.ProviderOpts) (Adapter, error) { +func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats.Option, hostName string, routerListenAddr string, deleteConsumersOnShutdown bool, providerOpts datasource.ProviderOpts) (Adapter, error) { if logger == nil { logger = zap.NewNop() } @@ -422,16 +457,17 @@ func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats ctx, cancelFunc := context.WithCancel(ctx) return &ProviderAdapter{ - ctx: ctx, - cancel: cancelFunc, - logger: logger.With(zap.String("pubsub", "nats")), - closeWg: sync.WaitGroup{}, - hostName: hostName, - routerListenAddr: routerListenAddr, - url: url, - opts: opts, - flushTimeout: 10 * time.Second, - streamMetricStore: store, + ctx: ctx, + cancel: cancelFunc, + logger: logger.With(zap.String("pubsub", "nats")), + closeWg: sync.WaitGroup{}, + hostName: hostName, + routerListenAddr: routerListenAddr, + url: url, + opts: opts, + flushTimeout: 10 * time.Second, + streamMetricStore: store, + deleteConsumersOnShutdown: deleteConsumersOnShutdown, }, nil } diff --git a/router/pkg/pubsub/nats/provider_builder.go b/router/pkg/pubsub/nats/provider_builder.go index d8314305ae..8b590d7494 100644 --- a/router/pkg/pubsub/nats/provider_builder.go +++ b/router/pkg/pubsub/nats/provider_builder.go @@ -123,7 +123,7 @@ func buildProvider(ctx context.Context, provider config.NatsEventSource, logger return nil, fmt.Errorf("failed to build options for Nats provider with ID \"%s\": %w", provider.ID, err) } - adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, providerOpts) + adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, provider.DeleteDurableConsumersOnShutdown, providerOpts) if err != nil { return nil, fmt.Errorf("failed to create adapter for Nats provider with ID \"%s\": %w", provider.ID, err) } From 698a679f46202bc850f189c8b3ed370f1936075e Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:02:51 +0100 Subject: [PATCH 2/8] Use a dedicated config subcategory for consumer configuration --- router/pkg/config/config.go | 12 ++++++++---- router/pkg/config/config.schema.json | 15 +++++++++++---- router/pkg/config/testdata/config_full.json | 8 ++++++-- 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index dbe474b401..8722c80b3b 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -616,11 +616,15 @@ type NatsAuthentication struct { NatsTokenBasedAuthentication `yaml:"token,inline"` } +type NatsConsumersConfiguration struct { + DeleteOnShutdown bool `yaml:"delete_on_shutdown" envDefault:"false"` +} + type NatsEventSource struct { - ID string `yaml:"id,omitempty"` - URL string `yaml:"url,omitempty"` - Authentication *NatsAuthentication `yaml:"authentication,omitempty"` - DeleteDurableConsumersOnShutdown bool `yaml:"delete_durable_consumers_on_shutdown" envDefault:"false"` + ID string `yaml:"id,omitempty"` + URL string `yaml:"url,omitempty"` + Authentication *NatsAuthentication `yaml:"authentication,omitempty"` + Consumers NatsConsumersConfiguration `yaml:"consumers,omitempty"` } func (n NatsEventSource) GetID() string { diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index e2fa1fb928..4fe65a449c 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -2581,10 +2581,17 @@ } ] }, - "delete_durable_consumers_on_shutdown": { - "type": "boolean", - "description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down. This prevents zombie consumers accumulating on the NATS server across Kubernetes rolling deployments where each pod has a unique hostname baked into the consumer name. Defaults to false.", - "default": false + "consumers": { + "type": "object", + "description": "Configuration for durable JetStream consumers managed by this NATS provider.", + "additionalProperties": false, + "properties": { + "delete_on_shutdown": { + "type": "boolean", + "description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down. This prevents zombie consumers accumulating on the NATS server across Kubernetes rolling deployments where each pod has a unique hostname baked into the consumer name. Defaults to false.", + "default": false + } + } } } } diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 98c86135c7..8bf3fb2c33 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -657,7 +657,9 @@ "ID": "default", "URL": "nats://localhost:4222", "Authentication": null, - "DeleteDurableConsumersOnShutdown": false + "Consumers": { + "DeleteOnShutdown": false + } }, { "ID": "my-nats", @@ -669,7 +671,9 @@ }, "Token": null }, - "DeleteDurableConsumersOnShutdown": false + "Consumers": { + "DeleteOnShutdown": false + } } ], "Kafka": [ From c2dd6ac5899e33c5585f79bd996d34f56fdac29b Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:42:20 +0100 Subject: [PATCH 3/8] fix: avoid multiple consumer trackings with the same name --- router/pkg/pubsub/nats/adapter.go | 78 +++++++++++++++++-------------- 1 file changed, 42 insertions(+), 36 deletions(-) diff --git a/router/pkg/pubsub/nats/adapter.go b/router/pkg/pubsub/nats/adapter.go index 1ad2cc4da1..fb3e0c9da7 100644 --- a/router/pkg/pubsub/nats/adapter.go +++ b/router/pkg/pubsub/nats/adapter.go @@ -32,9 +32,9 @@ type Adapter interface { // Ensure ProviderAdapter implements ProviderSubscriptionHooks var _ datasource.Adapter = (*ProviderAdapter)(nil) -type trackedConsumer struct { - streamName string - consumerName string +type consumerConfig struct { + deleteOnShutdown bool + trackedConsumers sync.Map // used as a set, key = consumer name, value = stream name } // ProviderAdapter implements the AdapterInterface for NATS pub/sub @@ -51,10 +51,7 @@ type ProviderAdapter struct { opts []nats.Option flushTimeout time.Duration streamMetricStore metric.StreamMetricStore - - deleteConsumersOnShutdown bool - trackedConsumersMu sync.Mutex - trackedConsumers []trackedConsumer + consumerConfig consumerConfig } // getInstanceIdentifier returns an identifier for the current instance. @@ -125,13 +122,9 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, cfg datasource.Subscrip return datasource.NewError(fmt.Sprintf(`failed to create or update consumer for stream "%s"`, subConf.StreamConfiguration.StreamName), err) } - if p.deleteConsumersOnShutdown { - p.trackedConsumersMu.Lock() - p.trackedConsumers = append(p.trackedConsumers, trackedConsumer{ - streamName: subConf.StreamConfiguration.StreamName, - consumerName: durableConsumerName, - }) - p.trackedConsumersMu.Unlock() + // Track newly created durable consumer so we can later delete them, if it's necessary. + if p.consumerConfig.deleteOnShutdown { + p.consumerConfig.trackedConsumers.Store(durableConsumerName, subConf.StreamConfiguration.StreamName) } p.closeWg.Add(1) @@ -401,21 +394,32 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error { var shutdownErr error - // Delete durable consumers before closing the connection — the JetStream - // API requires an open connection to reach the NATS server. - if p.deleteConsumersOnShutdown && p.js != nil { - p.trackedConsumersMu.Lock() - consumers := p.trackedConsumers - p.trackedConsumersMu.Unlock() - for _, tc := range consumers { - if err := p.js.DeleteConsumer(ctx, tc.streamName, tc.consumerName); err != nil { + // Delete durable consumers before closing the connection. + if p.consumerConfig.deleteOnShutdown && p.js != nil { + p.consumerConfig.trackedConsumers.Range(func(key, value any) bool { + consumerName, ok := key.(string) + if !ok { + // skip this odd element, should not happen in reality + return true + } + + streamName, ok := value.(string) + if !ok { + // skip this odd element, should not happen in reality + return true + } + + err := p.js.DeleteConsumer(ctx, streamName, consumerName) + if err != nil { p.logger.Warn("failed to delete durable consumer on shutdown", - zap.String("stream", tc.streamName), - zap.String("consumer", tc.consumerName), + zap.String("stream", streamName), + zap.String("consumer", consumerName), zap.Error(err), ) } - } + + return true + }) } fErr := p.flush(ctx) @@ -457,17 +461,19 @@ func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats ctx, cancelFunc := context.WithCancel(ctx) return &ProviderAdapter{ - ctx: ctx, - cancel: cancelFunc, - logger: logger.With(zap.String("pubsub", "nats")), - closeWg: sync.WaitGroup{}, - hostName: hostName, - routerListenAddr: routerListenAddr, - url: url, - opts: opts, - flushTimeout: 10 * time.Second, - streamMetricStore: store, - deleteConsumersOnShutdown: deleteConsumersOnShutdown, + ctx: ctx, + cancel: cancelFunc, + logger: logger.With(zap.String("pubsub", "nats")), + closeWg: sync.WaitGroup{}, + hostName: hostName, + routerListenAddr: routerListenAddr, + url: url, + opts: opts, + flushTimeout: 10 * time.Second, + streamMetricStore: store, + consumerConfig: consumerConfig{ + deleteOnShutdown: deleteConsumersOnShutdown, + }, }, nil } From 1bb291ef30fd7efef03e439bb1f92a3adcabaf44 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Mon, 9 Mar 2026 15:20:07 +0100 Subject: [PATCH 4/8] chore: outsource into new functions --- router/pkg/config/config.schema.json | 2 +- router/pkg/pubsub/nats/adapter.go | 121 +++++++++++++-------- router/pkg/pubsub/nats/provider_builder.go | 2 +- 3 files changed, 76 insertions(+), 49 deletions(-) diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 4fe65a449c..33f29fb0e6 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -2588,7 +2588,7 @@ "properties": { "delete_on_shutdown": { "type": "boolean", - "description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down. This prevents zombie consumers accumulating on the NATS server across Kubernetes rolling deployments where each pod has a unique hostname baked into the consumer name. Defaults to false.", + "description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down. Defaults to false.", "default": false } } diff --git a/router/pkg/pubsub/nats/adapter.go b/router/pkg/pubsub/nats/adapter.go index fb3e0c9da7..7ac8cbbc1d 100644 --- a/router/pkg/pubsub/nats/adapter.go +++ b/router/pkg/pubsub/nats/adapter.go @@ -34,7 +34,7 @@ var _ datasource.Adapter = (*ProviderAdapter)(nil) type consumerConfig struct { deleteOnShutdown bool - trackedConsumers sync.Map // used as a set, key = consumer name, value = stream name + trackedConsumers sync.Map // key = consumer name, value = stream name } // ProviderAdapter implements the AdapterInterface for NATS pub/sub @@ -103,28 +103,12 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, cfg datasource.Subscrip } if subConf.StreamConfiguration != nil { - durableConsumerName, err := p.getDurableConsumerName(subConf.StreamConfiguration.Consumer, subConf.Subjects) - if err != nil { - return err - } - consumerConfig := jetstream.ConsumerConfig{ - Durable: durableConsumerName, - FilterSubjects: subConf.Subjects, - } - // Durable consumers are removed automatically only if the InactiveThreshold value is set - if subConf.StreamConfiguration.ConsumerInactiveThreshold > 0 { - consumerConfig.InactiveThreshold = time.Duration(subConf.StreamConfiguration.ConsumerInactiveThreshold) * time.Second - } - - consumer, err := p.js.CreateOrUpdateConsumer(ctx, subConf.StreamConfiguration.StreamName, consumerConfig) + consumer, err := p.createOrUpdateDurableConsumer(ctx, subConf) if err != nil { log.Error("creating or updating consumer", zap.Error(err)) - return datasource.NewError(fmt.Sprintf(`failed to create or update consumer for stream "%s"`, subConf.StreamConfiguration.StreamName), err) - } - - // Track newly created durable consumer so we can later delete them, if it's necessary. - if p.consumerConfig.deleteOnShutdown { - p.consumerConfig.trackedConsumers.Store(durableConsumerName, subConf.StreamConfiguration.StreamName) + return datasource.NewError( + fmt.Sprintf(`failed to create or update consumer for stream "%s"`, subConf.StreamConfiguration.StreamName), err, + ) } p.closeWg.Add(1) @@ -394,32 +378,8 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error { var shutdownErr error - // Delete durable consumers before closing the connection. - if p.consumerConfig.deleteOnShutdown && p.js != nil { - p.consumerConfig.trackedConsumers.Range(func(key, value any) bool { - consumerName, ok := key.(string) - if !ok { - // skip this odd element, should not happen in reality - return true - } - - streamName, ok := value.(string) - if !ok { - // skip this odd element, should not happen in reality - return true - } - - err := p.js.DeleteConsumer(ctx, streamName, consumerName) - if err != nil { - p.logger.Warn("failed to delete durable consumer on shutdown", - zap.String("stream", streamName), - zap.String("consumer", consumerName), - zap.Error(err), - ) - } - - return true - }) + if p.consumerConfig.deleteOnShutdown { + p.deleteDurableConsumers(ctx) } fErr := p.flush(ctx) @@ -446,6 +406,73 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error { return nil } +// createOrUpdateDurableConsumer creates or updates the durable consumer on the nats server, +// based on configuration settings from the adapter and subConf. +// It computes the consumer name, adds or updates the consumer on the nats server and +// keeps track of added consumers for later deletion. +func (p *ProviderAdapter) createOrUpdateDurableConsumer(ctx context.Context, subConf *SubscriptionEventConfiguration) (jetstream.Consumer, error) { + if p.js == nil { + return nil, nil + } + + durableConsumerName, err := p.getDurableConsumerName(subConf.StreamConfiguration.Consumer, subConf.Subjects) + if err != nil { + return nil, fmt.Errorf("compute consumer name: %w", err) + } + consumerConfig := jetstream.ConsumerConfig{ + Durable: durableConsumerName, + FilterSubjects: subConf.Subjects, + } + // Durable consumers are removed automatically only if the InactiveThreshold value is set + if subConf.StreamConfiguration.ConsumerInactiveThreshold > 0 { + consumerConfig.InactiveThreshold = time.Duration(subConf.StreamConfiguration.ConsumerInactiveThreshold) * time.Second + } + + consumer, err := p.js.CreateOrUpdateConsumer(ctx, subConf.StreamConfiguration.StreamName, consumerConfig) + if err != nil { + return nil, fmt.Errorf("create or update consumer on nats: %w", err) + } + + // Track newly created durable consumer so we can later delete them, if it's necessary. + if p.consumerConfig.deleteOnShutdown { + p.consumerConfig.trackedConsumers.Store(durableConsumerName, subConf.StreamConfiguration.StreamName) + } + + return consumer, nil +} + +// deleteDurableConsumers deletes all durable consumers used by this router instance from the nats server. +func (p *ProviderAdapter) deleteDurableConsumers(ctx context.Context) { + if p.js == nil { + return + } + + p.consumerConfig.trackedConsumers.Range(func(key, value any) bool { + consumerName, ok := key.(string) + if !ok { + // skip this odd element, should not happen in reality + return true + } + + streamName, ok := value.(string) + if !ok { + // skip this odd element, should not happen in reality + return true + } + + err := p.js.DeleteConsumer(ctx, streamName, consumerName) + if err != nil { + p.logger.Warn("failed to delete durable consumer on shutdown", + zap.String("stream", streamName), + zap.String("consumer", consumerName), + zap.Error(err), + ) + } + + return true + }) +} + func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats.Option, hostName string, routerListenAddr string, deleteConsumersOnShutdown bool, providerOpts datasource.ProviderOpts) (Adapter, error) { if logger == nil { logger = zap.NewNop() diff --git a/router/pkg/pubsub/nats/provider_builder.go b/router/pkg/pubsub/nats/provider_builder.go index 8b590d7494..814b84ac9b 100644 --- a/router/pkg/pubsub/nats/provider_builder.go +++ b/router/pkg/pubsub/nats/provider_builder.go @@ -123,7 +123,7 @@ func buildProvider(ctx context.Context, provider config.NatsEventSource, logger return nil, fmt.Errorf("failed to build options for Nats provider with ID \"%s\": %w", provider.ID, err) } - adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, provider.DeleteDurableConsumersOnShutdown, providerOpts) + adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, provider.Consumers.DeleteOnShutdown, providerOpts) if err != nil { return nil, fmt.Errorf("failed to create adapter for Nats provider with ID \"%s\": %w", provider.ID, err) } From 6116698f077c75805e6710da1479dd28bf82cca1 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Mon, 9 Mar 2026 16:53:42 +0100 Subject: [PATCH 5/8] chore: add test --- router-tests/events/nats_events_test.go | 57 +++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/router-tests/events/nats_events_test.go b/router-tests/events/nats_events_test.go index 5d3b2ec2cf..ee11639e56 100644 --- a/router-tests/events/nats_events_test.go +++ b/router-tests/events/nats_events_test.go @@ -1200,6 +1200,63 @@ func TestNatsEvents(t *testing.T) { }) }) + t.Run("durable consumer is deleted from nats server on router shutdown", func(t *testing.T) { + t.Parallel() + + env, err := testenv.CreateTestEnv(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + // Remove feature flag configs to work around a bug where buildGraphMux + // overwrites s.pubSubProviders on each call, orphaning the base providers. + routerConfig.FeatureFlagConfigs = nil + }, + ModifyEventsConfiguration: func(cfg *config.EventsConfiguration) { + for i := range cfg.Providers.Nats { + cfg.Providers.Nats[i].Consumers.DeleteOnShutdown = true + } + }, + }) + require.NoError(t, err) + t.Cleanup(env.Shutdown) + + js, err := jetstream.New(env.NatsConnectionDefault) + require.NoError(t, err) + + streamName := env.GetPubSubName("streamName") + _, err = js.CreateOrUpdateStream(env.Context, jetstream.StreamConfig{ + Name: streamName, + Subjects: []string{env.GetPubSubName("employeeUpdated.>")}, + Storage: jetstream.MemoryStorage, + }) + require.NoError(t, err) + + conn := env.InitGraphQLWebSocketConnection(nil, nil, nil) + err = conn.WriteJSON(&testenv.WebSocketMessage{ + ID: "1", + Type: "subscribe", + Payload: []byte(`{"query":"subscription { employeeUpdatedNatsStream(id: 12) { id }}"}`), + }) + require.NoError(t, err) + env.WaitForSubscriptionCount(1, NatsWaitTimeout) + + // Verify the durable consumer was created on the stream + stream, err := js.Stream(env.Context, streamName) + require.NoError(t, err) + streamInfo, err := stream.Info(env.Context) + require.NoError(t, err) + require.Equal(t, 1, streamInfo.State.Consumers, "expected one consumer before shutdown") + + // Shut down the router; this should trigger deletion of durable consumers + env.Shutdown() + + // env.Context is cancelled by Shutdown, so use a fresh context for JetStream queries + ctx := context.Background() + streamInfo, err = stream.Info(ctx) + require.NoError(t, err) + require.Equal(t, 0, streamInfo.State.Consumers, "expected no consumers after shutdown with delete_on_shutdown enabled") + }) + t.Run("subscribing to a non-existent stream returns an error", func(t *testing.T) { t.Parallel() From d628b8883c2da28cdeb6c7107eca293218dc5b64 Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Tue, 10 Mar 2026 10:08:57 +0100 Subject: [PATCH 6/8] chore: update config description --- router/pkg/config/config.schema.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 33f29fb0e6..84bc0f691a 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -2588,7 +2588,7 @@ "properties": { "delete_on_shutdown": { "type": "boolean", - "description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down. Defaults to false.", + "description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down normally. Defaults to false.", "default": false } } From 8dd9a7264feb1afbc343c61ee912b46458c91d4e Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Tue, 10 Mar 2026 11:17:44 +0100 Subject: [PATCH 7/8] chore: adjust config parameters --- router-tests/events/nats_events_test.go | 2 +- router/pkg/config/config.go | 6 +++++- router/pkg/config/config.schema.json | 17 ++++++++++++----- router/pkg/config/testdata/config_full.json | 8 ++++++-- router/pkg/pubsub/nats/provider_builder.go | 2 +- 5 files changed, 25 insertions(+), 10 deletions(-) diff --git a/router-tests/events/nats_events_test.go b/router-tests/events/nats_events_test.go index ee11639e56..acf4fcc647 100644 --- a/router-tests/events/nats_events_test.go +++ b/router-tests/events/nats_events_test.go @@ -1213,7 +1213,7 @@ func TestNatsEvents(t *testing.T) { }, ModifyEventsConfiguration: func(cfg *config.EventsConfiguration) { for i := range cfg.Providers.Nats { - cfg.Providers.Nats[i].Consumers.DeleteOnShutdown = true + cfg.Providers.Nats[i].Consumers.Durable.DeleteOnShutdown = true } }, }) diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index 9d5553a35c..ba52f28ba7 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -616,10 +616,14 @@ type NatsAuthentication struct { NatsTokenBasedAuthentication `yaml:"token,inline"` } -type NatsConsumersConfiguration struct { +type NatsDurableConsumersConfiguration struct { DeleteOnShutdown bool `yaml:"delete_on_shutdown" envDefault:"false"` } +type NatsConsumersConfiguration struct { + Durable NatsDurableConsumersConfiguration `yaml:"durable,omitempty"` +} + type NatsEventSource struct { ID string `yaml:"id,omitempty"` URL string `yaml:"url,omitempty"` diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 84bc0f691a..ac01bedeb7 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -2583,13 +2583,20 @@ }, "consumers": { "type": "object", - "description": "Configuration for durable JetStream consumers managed by this NATS provider.", + "description": "Configuration for JetStream consumers managed by this NATS provider.", "additionalProperties": false, "properties": { - "delete_on_shutdown": { - "type": "boolean", - "description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down normally. Defaults to false.", - "default": false + "durable": { + "type": "object", + "description": "Configuration for durable JetStream consumers managed by this NATS provider.", + "additionalProperties": false, + "properties": { + "delete_on_shutdown": { + "type": "boolean", + "description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down normally. Defaults to false.", + "default": false + } + } } } } diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index 8bf3fb2c33..e9e85f1e97 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -658,7 +658,9 @@ "URL": "nats://localhost:4222", "Authentication": null, "Consumers": { - "DeleteOnShutdown": false + "Durable": { + "DeleteOnShutdown": false + } } }, { @@ -672,7 +674,9 @@ "Token": null }, "Consumers": { - "DeleteOnShutdown": false + "Durable": { + "DeleteOnShutdown": false + } } } ], diff --git a/router/pkg/pubsub/nats/provider_builder.go b/router/pkg/pubsub/nats/provider_builder.go index 814b84ac9b..17b1e4febb 100644 --- a/router/pkg/pubsub/nats/provider_builder.go +++ b/router/pkg/pubsub/nats/provider_builder.go @@ -123,7 +123,7 @@ func buildProvider(ctx context.Context, provider config.NatsEventSource, logger return nil, fmt.Errorf("failed to build options for Nats provider with ID \"%s\": %w", provider.ID, err) } - adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, provider.Consumers.DeleteOnShutdown, providerOpts) + adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, provider.Consumers.Durable.DeleteOnShutdown, providerOpts) if err != nil { return nil, fmt.Errorf("failed to create adapter for Nats provider with ID \"%s\": %w", provider.ID, err) } From 1263933492ea7de5401ecd41a9d4b522e21e53be Mon Sep 17 00:00:00 2001 From: Dominik Korittki <23359034+dkorittki@users.noreply.github.com> Date: Tue, 10 Mar 2026 13:09:12 +0100 Subject: [PATCH 8/8] chore: add test to verify consumers stay --- router-tests/events/nats_events_test.go | 57 +++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/router-tests/events/nats_events_test.go b/router-tests/events/nats_events_test.go index acf4fcc647..3a42dc1467 100644 --- a/router-tests/events/nats_events_test.go +++ b/router-tests/events/nats_events_test.go @@ -1257,6 +1257,63 @@ func TestNatsEvents(t *testing.T) { require.Equal(t, 0, streamInfo.State.Consumers, "expected no consumers after shutdown with delete_on_shutdown enabled") }) + t.Run("durable consumer is not deleted from nats server on router shutdown", func(t *testing.T) { + t.Parallel() + + env, err := testenv.CreateTestEnv(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + // Remove feature flag configs to work around a bug where buildGraphMux + // overwrites s.pubSubProviders on each call, orphaning the base providers. + routerConfig.FeatureFlagConfigs = nil + }, + ModifyEventsConfiguration: func(cfg *config.EventsConfiguration) { + for i := range cfg.Providers.Nats { + cfg.Providers.Nats[i].Consumers.Durable.DeleteOnShutdown = false + } + }, + }) + require.NoError(t, err) + t.Cleanup(env.Shutdown) + + js, err := jetstream.New(env.NatsConnectionDefault) + require.NoError(t, err) + + streamName := env.GetPubSubName("streamName") + _, err = js.CreateOrUpdateStream(env.Context, jetstream.StreamConfig{ + Name: streamName, + Subjects: []string{env.GetPubSubName("employeeUpdated.>")}, + Storage: jetstream.MemoryStorage, + }) + require.NoError(t, err) + + conn := env.InitGraphQLWebSocketConnection(nil, nil, nil) + err = conn.WriteJSON(&testenv.WebSocketMessage{ + ID: "1", + Type: "subscribe", + Payload: []byte(`{"query":"subscription { employeeUpdatedNatsStream(id: 12) { id }}"}`), + }) + require.NoError(t, err) + env.WaitForSubscriptionCount(1, NatsWaitTimeout) + + // Verify the durable consumer was created on the stream + stream, err := js.Stream(env.Context, streamName) + require.NoError(t, err) + streamInfo, err := stream.Info(env.Context) + require.NoError(t, err) + require.Equal(t, 1, streamInfo.State.Consumers, "expected one consumer before shutdown") + + // Shut down the router; this should trigger deletion of durable consumers + env.Shutdown() + + // env.Context is cancelled by Shutdown, so use a fresh context for JetStream queries + ctx := context.Background() + streamInfo, err = stream.Info(ctx) + require.NoError(t, err) + require.Equal(t, 1, streamInfo.State.Consumers, "expected one consumer after shutdown with delete_on_shutdown disabled") + }) + t.Run("subscribing to a non-existent stream returns an error", func(t *testing.T) { t.Parallel()