diff --git a/demo/pkg/subgraphs/subgraphs.go b/demo/pkg/subgraphs/subgraphs.go index 3b5d294fca..44764f14bd 100644 --- a/demo/pkg/subgraphs/subgraphs.go +++ b/demo/pkg/subgraphs/subgraphs.go @@ -212,7 +212,7 @@ func New(ctx context.Context, config *Config) (*Subgraphs, error) { natsPubSubByProviderID := map[string]natsPubsub.Adapter{} - defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{ + defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", false, datasource.ProviderOpts{ StreamMetricStore: rmetric.NewNoopStreamMetricStore(), }) if err != nil { @@ -223,7 +223,7 @@ func New(ctx context.Context, config *Config) (*Subgraphs, error) { } natsPubSubByProviderID["default"] = defaultAdapter - myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{ + myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", false, datasource.ProviderOpts{ StreamMetricStore: rmetric.NewNoopStreamMetricStore(), }) if err != nil { diff --git a/router-tests/events/nats_events_test.go b/router-tests/events/nats_events_test.go index 5d3b2ec2cf..3a42dc1467 100644 --- a/router-tests/events/nats_events_test.go +++ b/router-tests/events/nats_events_test.go @@ -1200,6 +1200,120 @@ func TestNatsEvents(t *testing.T) { }) }) + t.Run("durable consumer is deleted from nats server on router shutdown", func(t *testing.T) { + t.Parallel() + + env, err := testenv.CreateTestEnv(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + // Remove feature flag configs to work around a bug where buildGraphMux + // overwrites s.pubSubProviders on each call, orphaning the base providers. + routerConfig.FeatureFlagConfigs = nil + }, + ModifyEventsConfiguration: func(cfg *config.EventsConfiguration) { + for i := range cfg.Providers.Nats { + cfg.Providers.Nats[i].Consumers.Durable.DeleteOnShutdown = true + } + }, + }) + require.NoError(t, err) + t.Cleanup(env.Shutdown) + + js, err := jetstream.New(env.NatsConnectionDefault) + require.NoError(t, err) + + streamName := env.GetPubSubName("streamName") + _, err = js.CreateOrUpdateStream(env.Context, jetstream.StreamConfig{ + Name: streamName, + Subjects: []string{env.GetPubSubName("employeeUpdated.>")}, + Storage: jetstream.MemoryStorage, + }) + require.NoError(t, err) + + conn := env.InitGraphQLWebSocketConnection(nil, nil, nil) + err = conn.WriteJSON(&testenv.WebSocketMessage{ + ID: "1", + Type: "subscribe", + Payload: []byte(`{"query":"subscription { employeeUpdatedNatsStream(id: 12) { id }}"}`), + }) + require.NoError(t, err) + env.WaitForSubscriptionCount(1, NatsWaitTimeout) + + // Verify the durable consumer was created on the stream + stream, err := js.Stream(env.Context, streamName) + require.NoError(t, err) + streamInfo, err := stream.Info(env.Context) + require.NoError(t, err) + require.Equal(t, 1, streamInfo.State.Consumers, "expected one consumer before shutdown") + + // Shut down the router; this should trigger deletion of durable consumers + env.Shutdown() + + // env.Context is cancelled by Shutdown, so use a fresh context for JetStream queries + ctx := context.Background() + streamInfo, err = stream.Info(ctx) + require.NoError(t, err) + require.Equal(t, 0, streamInfo.State.Consumers, "expected no consumers after shutdown with delete_on_shutdown enabled") + }) + + t.Run("durable consumer is not deleted from nats server on router shutdown", func(t *testing.T) { + t.Parallel() + + env, err := testenv.CreateTestEnv(t, &testenv.Config{ + RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate, + EnableNats: true, + ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) { + // Remove feature flag configs to work around a bug where buildGraphMux + // overwrites s.pubSubProviders on each call, orphaning the base providers. + routerConfig.FeatureFlagConfigs = nil + }, + ModifyEventsConfiguration: func(cfg *config.EventsConfiguration) { + for i := range cfg.Providers.Nats { + cfg.Providers.Nats[i].Consumers.Durable.DeleteOnShutdown = false + } + }, + }) + require.NoError(t, err) + t.Cleanup(env.Shutdown) + + js, err := jetstream.New(env.NatsConnectionDefault) + require.NoError(t, err) + + streamName := env.GetPubSubName("streamName") + _, err = js.CreateOrUpdateStream(env.Context, jetstream.StreamConfig{ + Name: streamName, + Subjects: []string{env.GetPubSubName("employeeUpdated.>")}, + Storage: jetstream.MemoryStorage, + }) + require.NoError(t, err) + + conn := env.InitGraphQLWebSocketConnection(nil, nil, nil) + err = conn.WriteJSON(&testenv.WebSocketMessage{ + ID: "1", + Type: "subscribe", + Payload: []byte(`{"query":"subscription { employeeUpdatedNatsStream(id: 12) { id }}"}`), + }) + require.NoError(t, err) + env.WaitForSubscriptionCount(1, NatsWaitTimeout) + + // Verify the durable consumer was created on the stream + stream, err := js.Stream(env.Context, streamName) + require.NoError(t, err) + streamInfo, err := stream.Info(env.Context) + require.NoError(t, err) + require.Equal(t, 1, streamInfo.State.Consumers, "expected one consumer before shutdown") + + // Shut down the router; this should trigger deletion of durable consumers + env.Shutdown() + + // env.Context is cancelled by Shutdown, so use a fresh context for JetStream queries + ctx := context.Background() + streamInfo, err = stream.Info(ctx) + require.NoError(t, err) + require.Equal(t, 1, streamInfo.State.Consumers, "expected one consumer after shutdown with delete_on_shutdown disabled") + }) + t.Run("subscribing to a non-existent stream returns an error", func(t *testing.T) { t.Parallel() diff --git a/router-tests/testenv/testenv.go b/router-tests/testenv/testenv.go index 7fabc4a2b8..7e0cb2014f 100644 --- a/router-tests/testenv/testenv.go +++ b/router-tests/testenv/testenv.go @@ -2923,7 +2923,7 @@ func subgraphOptions(ctx context.Context, t testing.TB, logger *zap.Logger, nats } natsPubSubByProviderID := make(map[string]pubsubNats.Adapter, len(DemoNatsProviders)) for _, sourceName := range DemoNatsProviders { - adapter, err := pubsubNats.NewAdapter(ctx, logger, natsData.Params[0].Url, natsData.Params[0].Opts, "hostname", "listenaddr", datasource.ProviderOpts{ + adapter, err := pubsubNats.NewAdapter(ctx, logger, natsData.Params[0].Url, natsData.Params[0].Opts, "hostname", "listenaddr", false, datasource.ProviderOpts{ StreamMetricStore: rmetric.NewNoopStreamMetricStore(), }) require.NoError(t, err) diff --git a/router/pkg/config/config.go b/router/pkg/config/config.go index e50a2e510d..ba52f28ba7 100644 --- a/router/pkg/config/config.go +++ b/router/pkg/config/config.go @@ -616,10 +616,19 @@ type NatsAuthentication struct { NatsTokenBasedAuthentication `yaml:"token,inline"` } +type NatsDurableConsumersConfiguration struct { + DeleteOnShutdown bool `yaml:"delete_on_shutdown" envDefault:"false"` +} + +type NatsConsumersConfiguration struct { + Durable NatsDurableConsumersConfiguration `yaml:"durable,omitempty"` +} + type NatsEventSource struct { - ID string `yaml:"id,omitempty"` - URL string `yaml:"url,omitempty"` - Authentication *NatsAuthentication `yaml:"authentication,omitempty"` + ID string `yaml:"id,omitempty"` + URL string `yaml:"url,omitempty"` + Authentication *NatsAuthentication `yaml:"authentication,omitempty"` + Consumers NatsConsumersConfiguration `yaml:"consumers,omitempty"` } func (n NatsEventSource) GetID() string { diff --git a/router/pkg/config/config.schema.json b/router/pkg/config/config.schema.json index 2948ea0e46..ac01bedeb7 100644 --- a/router/pkg/config/config.schema.json +++ b/router/pkg/config/config.schema.json @@ -2580,6 +2580,25 @@ } } ] + }, + "consumers": { + "type": "object", + "description": "Configuration for JetStream consumers managed by this NATS provider.", + "additionalProperties": false, + "properties": { + "durable": { + "type": "object", + "description": "Configuration for durable JetStream consumers managed by this NATS provider.", + "additionalProperties": false, + "properties": { + "delete_on_shutdown": { + "type": "boolean", + "description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down normally. Defaults to false.", + "default": false + } + } + } + } } } } diff --git a/router/pkg/config/testdata/config_full.json b/router/pkg/config/testdata/config_full.json index f97deeea5e..e9e85f1e97 100644 --- a/router/pkg/config/testdata/config_full.json +++ b/router/pkg/config/testdata/config_full.json @@ -656,7 +656,12 @@ { "ID": "default", "URL": "nats://localhost:4222", - "Authentication": null + "Authentication": null, + "Consumers": { + "Durable": { + "DeleteOnShutdown": false + } + } }, { "ID": "my-nats", @@ -667,6 +672,11 @@ "Username": "admin" }, "Token": null + }, + "Consumers": { + "Durable": { + "DeleteOnShutdown": false + } } } ], diff --git a/router/pkg/pubsub/nats/adapter.go b/router/pkg/pubsub/nats/adapter.go index 0c9c0c140f..7ac8cbbc1d 100644 --- a/router/pkg/pubsub/nats/adapter.go +++ b/router/pkg/pubsub/nats/adapter.go @@ -32,6 +32,11 @@ type Adapter interface { // Ensure ProviderAdapter implements ProviderSubscriptionHooks var _ datasource.Adapter = (*ProviderAdapter)(nil) +type consumerConfig struct { + deleteOnShutdown bool + trackedConsumers sync.Map // key = consumer name, value = stream name +} + // ProviderAdapter implements the AdapterInterface for NATS pub/sub type ProviderAdapter struct { ctx context.Context @@ -46,6 +51,7 @@ type ProviderAdapter struct { opts []nats.Option flushTimeout time.Duration streamMetricStore metric.StreamMetricStore + consumerConfig consumerConfig } // getInstanceIdentifier returns an identifier for the current instance. @@ -97,23 +103,12 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, cfg datasource.Subscrip } if subConf.StreamConfiguration != nil { - durableConsumerName, err := p.getDurableConsumerName(subConf.StreamConfiguration.Consumer, subConf.Subjects) - if err != nil { - return err - } - consumerConfig := jetstream.ConsumerConfig{ - Durable: durableConsumerName, - FilterSubjects: subConf.Subjects, - } - // Durable consumers are removed automatically only if the InactiveThreshold value is set - if subConf.StreamConfiguration.ConsumerInactiveThreshold > 0 { - consumerConfig.InactiveThreshold = time.Duration(subConf.StreamConfiguration.ConsumerInactiveThreshold) * time.Second - } - - consumer, err := p.js.CreateOrUpdateConsumer(ctx, subConf.StreamConfiguration.StreamName, consumerConfig) + consumer, err := p.createOrUpdateDurableConsumer(ctx, subConf) if err != nil { log.Error("creating or updating consumer", zap.Error(err)) - return datasource.NewError(fmt.Sprintf(`failed to create or update consumer for stream "%s"`, subConf.StreamConfiguration.StreamName), err) + return datasource.NewError( + fmt.Sprintf(`failed to create or update consumer for stream "%s"`, subConf.StreamConfiguration.StreamName), err, + ) } p.closeWg.Add(1) @@ -383,6 +378,10 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error { var shutdownErr error + if p.consumerConfig.deleteOnShutdown { + p.deleteDurableConsumers(ctx) + } + fErr := p.flush(ctx) if fErr != nil { shutdownErr = errors.Join(shutdownErr, fErr) @@ -407,7 +406,74 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error { return nil } -func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats.Option, hostName string, routerListenAddr string, providerOpts datasource.ProviderOpts) (Adapter, error) { +// createOrUpdateDurableConsumer creates or updates the durable consumer on the nats server, +// based on configuration settings from the adapter and subConf. +// It computes the consumer name, adds or updates the consumer on the nats server and +// keeps track of added consumers for later deletion. +func (p *ProviderAdapter) createOrUpdateDurableConsumer(ctx context.Context, subConf *SubscriptionEventConfiguration) (jetstream.Consumer, error) { + if p.js == nil { + return nil, nil + } + + durableConsumerName, err := p.getDurableConsumerName(subConf.StreamConfiguration.Consumer, subConf.Subjects) + if err != nil { + return nil, fmt.Errorf("compute consumer name: %w", err) + } + consumerConfig := jetstream.ConsumerConfig{ + Durable: durableConsumerName, + FilterSubjects: subConf.Subjects, + } + // Durable consumers are removed automatically only if the InactiveThreshold value is set + if subConf.StreamConfiguration.ConsumerInactiveThreshold > 0 { + consumerConfig.InactiveThreshold = time.Duration(subConf.StreamConfiguration.ConsumerInactiveThreshold) * time.Second + } + + consumer, err := p.js.CreateOrUpdateConsumer(ctx, subConf.StreamConfiguration.StreamName, consumerConfig) + if err != nil { + return nil, fmt.Errorf("create or update consumer on nats: %w", err) + } + + // Track newly created durable consumer so we can later delete them, if it's necessary. + if p.consumerConfig.deleteOnShutdown { + p.consumerConfig.trackedConsumers.Store(durableConsumerName, subConf.StreamConfiguration.StreamName) + } + + return consumer, nil +} + +// deleteDurableConsumers deletes all durable consumers used by this router instance from the nats server. +func (p *ProviderAdapter) deleteDurableConsumers(ctx context.Context) { + if p.js == nil { + return + } + + p.consumerConfig.trackedConsumers.Range(func(key, value any) bool { + consumerName, ok := key.(string) + if !ok { + // skip this odd element, should not happen in reality + return true + } + + streamName, ok := value.(string) + if !ok { + // skip this odd element, should not happen in reality + return true + } + + err := p.js.DeleteConsumer(ctx, streamName, consumerName) + if err != nil { + p.logger.Warn("failed to delete durable consumer on shutdown", + zap.String("stream", streamName), + zap.String("consumer", consumerName), + zap.Error(err), + ) + } + + return true + }) +} + +func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats.Option, hostName string, routerListenAddr string, deleteConsumersOnShutdown bool, providerOpts datasource.ProviderOpts) (Adapter, error) { if logger == nil { logger = zap.NewNop() } @@ -432,6 +498,9 @@ func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats opts: opts, flushTimeout: 10 * time.Second, streamMetricStore: store, + consumerConfig: consumerConfig{ + deleteOnShutdown: deleteConsumersOnShutdown, + }, }, nil } diff --git a/router/pkg/pubsub/nats/provider_builder.go b/router/pkg/pubsub/nats/provider_builder.go index d8314305ae..17b1e4febb 100644 --- a/router/pkg/pubsub/nats/provider_builder.go +++ b/router/pkg/pubsub/nats/provider_builder.go @@ -123,7 +123,7 @@ func buildProvider(ctx context.Context, provider config.NatsEventSource, logger return nil, fmt.Errorf("failed to build options for Nats provider with ID \"%s\": %w", provider.ID, err) } - adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, providerOpts) + adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, provider.Consumers.Durable.DeleteOnShutdown, providerOpts) if err != nil { return nil, fmt.Errorf("failed to create adapter for Nats provider with ID \"%s\": %w", provider.ID, err) }