Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions demo/pkg/subgraphs/subgraphs.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func New(ctx context.Context, config *Config) (*Subgraphs, error) {

natsPubSubByProviderID := map[string]natsPubsub.Adapter{}

defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{
defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", false, datasource.ProviderOpts{
StreamMetricStore: rmetric.NewNoopStreamMetricStore(),
})
if err != nil {
Expand All @@ -223,7 +223,7 @@ func New(ctx context.Context, config *Config) (*Subgraphs, error) {
}
natsPubSubByProviderID["default"] = defaultAdapter

myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{
myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", false, datasource.ProviderOpts{
StreamMetricStore: rmetric.NewNoopStreamMetricStore(),
})
if err != nil {
Expand Down
114 changes: 114 additions & 0 deletions router-tests/events/nats_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1200,6 +1200,120 @@ func TestNatsEvents(t *testing.T) {
})
})

t.Run("durable consumer is deleted from nats server on router shutdown", func(t *testing.T) {
Comment thread
alepane21 marked this conversation as resolved.
t.Parallel()

env, err := testenv.CreateTestEnv(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate,
EnableNats: true,
ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) {
// Remove feature flag configs to work around a bug where buildGraphMux
// overwrites s.pubSubProviders on each call, orphaning the base providers.
routerConfig.FeatureFlagConfigs = nil
},
Comment thread
dkorittki marked this conversation as resolved.
ModifyEventsConfiguration: func(cfg *config.EventsConfiguration) {
for i := range cfg.Providers.Nats {
cfg.Providers.Nats[i].Consumers.Durable.DeleteOnShutdown = true
}
},
})
require.NoError(t, err)
t.Cleanup(env.Shutdown)

js, err := jetstream.New(env.NatsConnectionDefault)
require.NoError(t, err)

streamName := env.GetPubSubName("streamName")
_, err = js.CreateOrUpdateStream(env.Context, jetstream.StreamConfig{
Name: streamName,
Subjects: []string{env.GetPubSubName("employeeUpdated.>")},
Storage: jetstream.MemoryStorage,
})
require.NoError(t, err)

conn := env.InitGraphQLWebSocketConnection(nil, nil, nil)
err = conn.WriteJSON(&testenv.WebSocketMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { employeeUpdatedNatsStream(id: 12) { id }}"}`),
})
require.NoError(t, err)
env.WaitForSubscriptionCount(1, NatsWaitTimeout)

// Verify the durable consumer was created on the stream
stream, err := js.Stream(env.Context, streamName)
require.NoError(t, err)
streamInfo, err := stream.Info(env.Context)
require.NoError(t, err)
require.Equal(t, 1, streamInfo.State.Consumers, "expected one consumer before shutdown")

// Shut down the router; this should trigger deletion of durable consumers
env.Shutdown()

// env.Context is cancelled by Shutdown, so use a fresh context for JetStream queries
ctx := context.Background()
streamInfo, err = stream.Info(ctx)
require.NoError(t, err)
require.Equal(t, 0, streamInfo.State.Consumers, "expected no consumers after shutdown with delete_on_shutdown enabled")
Comment thread
dkorittki marked this conversation as resolved.
})

t.Run("durable consumer is not deleted from nats server on router shutdown", func(t *testing.T) {
t.Parallel()

env, err := testenv.CreateTestEnv(t, &testenv.Config{
RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate,
EnableNats: true,
ModifyRouterConfig: func(routerConfig *nodev1.RouterConfig) {
// Remove feature flag configs to work around a bug where buildGraphMux
// overwrites s.pubSubProviders on each call, orphaning the base providers.
routerConfig.FeatureFlagConfigs = nil
},
ModifyEventsConfiguration: func(cfg *config.EventsConfiguration) {
for i := range cfg.Providers.Nats {
cfg.Providers.Nats[i].Consumers.Durable.DeleteOnShutdown = false
}
},
})
require.NoError(t, err)
t.Cleanup(env.Shutdown)

js, err := jetstream.New(env.NatsConnectionDefault)
require.NoError(t, err)

streamName := env.GetPubSubName("streamName")
_, err = js.CreateOrUpdateStream(env.Context, jetstream.StreamConfig{
Name: streamName,
Subjects: []string{env.GetPubSubName("employeeUpdated.>")},
Storage: jetstream.MemoryStorage,
})
require.NoError(t, err)

conn := env.InitGraphQLWebSocketConnection(nil, nil, nil)
err = conn.WriteJSON(&testenv.WebSocketMessage{
ID: "1",
Type: "subscribe",
Payload: []byte(`{"query":"subscription { employeeUpdatedNatsStream(id: 12) { id }}"}`),
})
require.NoError(t, err)
env.WaitForSubscriptionCount(1, NatsWaitTimeout)

// Verify the durable consumer was created on the stream
stream, err := js.Stream(env.Context, streamName)
require.NoError(t, err)
streamInfo, err := stream.Info(env.Context)
require.NoError(t, err)
require.Equal(t, 1, streamInfo.State.Consumers, "expected one consumer before shutdown")

// Shut down the router; this should trigger deletion of durable consumers
env.Shutdown()

// env.Context is cancelled by Shutdown, so use a fresh context for JetStream queries
ctx := context.Background()
streamInfo, err = stream.Info(ctx)
require.NoError(t, err)
require.Equal(t, 1, streamInfo.State.Consumers, "expected one consumer after shutdown with delete_on_shutdown disabled")
})

t.Run("subscribing to a non-existent stream returns an error", func(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 1 addition & 1 deletion router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2923,7 +2923,7 @@ func subgraphOptions(ctx context.Context, t testing.TB, logger *zap.Logger, nats
}
natsPubSubByProviderID := make(map[string]pubsubNats.Adapter, len(DemoNatsProviders))
for _, sourceName := range DemoNatsProviders {
adapter, err := pubsubNats.NewAdapter(ctx, logger, natsData.Params[0].Url, natsData.Params[0].Opts, "hostname", "listenaddr", datasource.ProviderOpts{
adapter, err := pubsubNats.NewAdapter(ctx, logger, natsData.Params[0].Url, natsData.Params[0].Opts, "hostname", "listenaddr", false, datasource.ProviderOpts{
StreamMetricStore: rmetric.NewNoopStreamMetricStore(),
})
require.NoError(t, err)
Expand Down
15 changes: 12 additions & 3 deletions router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,10 +616,19 @@ type NatsAuthentication struct {
NatsTokenBasedAuthentication `yaml:"token,inline"`
}

type NatsDurableConsumersConfiguration struct {
DeleteOnShutdown bool `yaml:"delete_on_shutdown" envDefault:"false"`
}

type NatsConsumersConfiguration struct {
Durable NatsDurableConsumersConfiguration `yaml:"durable,omitempty"`
}

type NatsEventSource struct {
ID string `yaml:"id,omitempty"`
URL string `yaml:"url,omitempty"`
Authentication *NatsAuthentication `yaml:"authentication,omitempty"`
ID string `yaml:"id,omitempty"`
URL string `yaml:"url,omitempty"`
Authentication *NatsAuthentication `yaml:"authentication,omitempty"`
Consumers NatsConsumersConfiguration `yaml:"consumers,omitempty"`
Comment thread
dkorittki marked this conversation as resolved.
}

func (n NatsEventSource) GetID() string {
Expand Down
19 changes: 19 additions & 0 deletions router/pkg/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2580,6 +2580,25 @@
}
}
]
},
"consumers": {
Comment thread
dkorittki marked this conversation as resolved.
"type": "object",
"description": "Configuration for JetStream consumers managed by this NATS provider.",
"additionalProperties": false,
"properties": {
"durable": {
"type": "object",
"description": "Configuration for durable JetStream consumers managed by this NATS provider.",
"additionalProperties": false,
"properties": {
"delete_on_shutdown": {
"type": "boolean",
"description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down normally. Defaults to false.",
"default": false
}
}
}
}
}
}
}
Expand Down
12 changes: 11 additions & 1 deletion router/pkg/config/testdata/config_full.json
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,12 @@
{
"ID": "default",
"URL": "nats://localhost:4222",
"Authentication": null
"Authentication": null,
"Consumers": {
"Durable": {
"DeleteOnShutdown": false
}
}
},
{
"ID": "my-nats",
Expand All @@ -667,6 +672,11 @@
"Username": "admin"
},
"Token": null
},
"Consumers": {
"Durable": {
"DeleteOnShutdown": false
}
}
}
],
Expand Down
101 changes: 85 additions & 16 deletions router/pkg/pubsub/nats/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type Adapter interface {
// Ensure ProviderAdapter implements ProviderSubscriptionHooks
var _ datasource.Adapter = (*ProviderAdapter)(nil)

type consumerConfig struct {
deleteOnShutdown bool
trackedConsumers sync.Map // key = consumer name, value = stream name
}

// ProviderAdapter implements the AdapterInterface for NATS pub/sub
type ProviderAdapter struct {
ctx context.Context
Expand All @@ -46,6 +51,7 @@ type ProviderAdapter struct {
opts []nats.Option
flushTimeout time.Duration
streamMetricStore metric.StreamMetricStore
consumerConfig consumerConfig
}

// getInstanceIdentifier returns an identifier for the current instance.
Expand Down Expand Up @@ -97,23 +103,12 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, cfg datasource.Subscrip
}

if subConf.StreamConfiguration != nil {
durableConsumerName, err := p.getDurableConsumerName(subConf.StreamConfiguration.Consumer, subConf.Subjects)
if err != nil {
return err
}
consumerConfig := jetstream.ConsumerConfig{
Durable: durableConsumerName,
FilterSubjects: subConf.Subjects,
}
// Durable consumers are removed automatically only if the InactiveThreshold value is set
if subConf.StreamConfiguration.ConsumerInactiveThreshold > 0 {
consumerConfig.InactiveThreshold = time.Duration(subConf.StreamConfiguration.ConsumerInactiveThreshold) * time.Second
}

consumer, err := p.js.CreateOrUpdateConsumer(ctx, subConf.StreamConfiguration.StreamName, consumerConfig)
consumer, err := p.createOrUpdateDurableConsumer(ctx, subConf)
if err != nil {
log.Error("creating or updating consumer", zap.Error(err))
return datasource.NewError(fmt.Sprintf(`failed to create or update consumer for stream "%s"`, subConf.StreamConfiguration.StreamName), err)
return datasource.NewError(
fmt.Sprintf(`failed to create or update consumer for stream "%s"`, subConf.StreamConfiguration.StreamName), err,
)
}

p.closeWg.Add(1)
Expand Down Expand Up @@ -383,6 +378,10 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error {

var shutdownErr error

if p.consumerConfig.deleteOnShutdown {
Comment thread
alepane21 marked this conversation as resolved.
p.deleteDurableConsumers(ctx)
}

fErr := p.flush(ctx)
if fErr != nil {
shutdownErr = errors.Join(shutdownErr, fErr)
Expand All @@ -407,7 +406,74 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error {
return nil
}

func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats.Option, hostName string, routerListenAddr string, providerOpts datasource.ProviderOpts) (Adapter, error) {
// createOrUpdateDurableConsumer creates or updates the durable consumer on the nats server,
// based on configuration settings from the adapter and subConf.
// It computes the consumer name, adds or updates the consumer on the nats server and
// keeps track of added consumers for later deletion.
func (p *ProviderAdapter) createOrUpdateDurableConsumer(ctx context.Context, subConf *SubscriptionEventConfiguration) (jetstream.Consumer, error) {
if p.js == nil {
return nil, nil
}

durableConsumerName, err := p.getDurableConsumerName(subConf.StreamConfiguration.Consumer, subConf.Subjects)
if err != nil {
return nil, fmt.Errorf("compute consumer name: %w", err)
}
consumerConfig := jetstream.ConsumerConfig{
Durable: durableConsumerName,
FilterSubjects: subConf.Subjects,
}
// Durable consumers are removed automatically only if the InactiveThreshold value is set
if subConf.StreamConfiguration.ConsumerInactiveThreshold > 0 {
consumerConfig.InactiveThreshold = time.Duration(subConf.StreamConfiguration.ConsumerInactiveThreshold) * time.Second
}

consumer, err := p.js.CreateOrUpdateConsumer(ctx, subConf.StreamConfiguration.StreamName, consumerConfig)
if err != nil {
return nil, fmt.Errorf("create or update consumer on nats: %w", err)
}

// Track newly created durable consumer so we can later delete them, if it's necessary.
if p.consumerConfig.deleteOnShutdown {
p.consumerConfig.trackedConsumers.Store(durableConsumerName, subConf.StreamConfiguration.StreamName)
}

return consumer, nil
}

// deleteDurableConsumers deletes all durable consumers used by this router instance from the nats server.
func (p *ProviderAdapter) deleteDurableConsumers(ctx context.Context) {
if p.js == nil {
return
}

p.consumerConfig.trackedConsumers.Range(func(key, value any) bool {
consumerName, ok := key.(string)
if !ok {
// skip this odd element, should not happen in reality
return true
}

streamName, ok := value.(string)
if !ok {
// skip this odd element, should not happen in reality
return true
}

err := p.js.DeleteConsumer(ctx, streamName, consumerName)
Comment thread
StarpTech marked this conversation as resolved.
if err != nil {
p.logger.Warn("failed to delete durable consumer on shutdown",
zap.String("stream", streamName),
zap.String("consumer", consumerName),
zap.Error(err),
)
}

return true
})
}

func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats.Option, hostName string, routerListenAddr string, deleteConsumersOnShutdown bool, providerOpts datasource.ProviderOpts) (Adapter, error) {
if logger == nil {
logger = zap.NewNop()
}
Expand All @@ -432,6 +498,9 @@ func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats
opts: opts,
flushTimeout: 10 * time.Second,
streamMetricStore: store,
consumerConfig: consumerConfig{
deleteOnShutdown: deleteConsumersOnShutdown,
},
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion router/pkg/pubsub/nats/provider_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func buildProvider(ctx context.Context, provider config.NatsEventSource, logger
return nil, fmt.Errorf("failed to build options for Nats provider with ID \"%s\": %w", provider.ID, err)
}

adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, providerOpts)
adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, provider.Consumers.Durable.DeleteOnShutdown, providerOpts)
if err != nil {
return nil, fmt.Errorf("failed to create adapter for Nats provider with ID \"%s\": %w", provider.ID, err)
}
Expand Down
Loading