Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions demo/pkg/subgraphs/subgraphs.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func New(ctx context.Context, config *Config) (*Subgraphs, error) {

natsPubSubByProviderID := map[string]natsPubsub.Adapter{}

defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{
defaultAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", false, datasource.ProviderOpts{
StreamMetricStore: rmetric.NewNoopStreamMetricStore(),
})
if err != nil {
Expand All @@ -223,7 +223,7 @@ func New(ctx context.Context, config *Config) (*Subgraphs, error) {
}
natsPubSubByProviderID["default"] = defaultAdapter

myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", datasource.ProviderOpts{
myNatsAdapter, err := natsPubsub.NewAdapter(ctx, zap.NewNop(), url, []nats.Option{}, "hostname", "test", false, datasource.ProviderOpts{
StreamMetricStore: rmetric.NewNoopStreamMetricStore(),
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion router-tests/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2911,7 +2911,7 @@ func subgraphOptions(ctx context.Context, t testing.TB, logger *zap.Logger, nats
}
natsPubSubByProviderID := make(map[string]pubsubNats.Adapter, len(DemoNatsProviders))
for _, sourceName := range DemoNatsProviders {
adapter, err := pubsubNats.NewAdapter(ctx, logger, natsData.Params[0].Url, natsData.Params[0].Opts, "hostname", "listenaddr", datasource.ProviderOpts{
adapter, err := pubsubNats.NewAdapter(ctx, logger, natsData.Params[0].Url, natsData.Params[0].Opts, "hostname", "listenaddr", false, datasource.ProviderOpts{
StreamMetricStore: rmetric.NewNoopStreamMetricStore(),
})
require.NoError(t, err)
Expand Down
11 changes: 8 additions & 3 deletions router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,10 +616,15 @@ type NatsAuthentication struct {
NatsTokenBasedAuthentication `yaml:"token,inline"`
}

type NatsConsumersConfiguration struct {
Comment thread
dkorittki marked this conversation as resolved.
Outdated
DeleteOnShutdown bool `yaml:"delete_on_shutdown" envDefault:"false"`
}

type NatsEventSource struct {
ID string `yaml:"id,omitempty"`
URL string `yaml:"url,omitempty"`
Authentication *NatsAuthentication `yaml:"authentication,omitempty"`
ID string `yaml:"id,omitempty"`
URL string `yaml:"url,omitempty"`
Authentication *NatsAuthentication `yaml:"authentication,omitempty"`
Consumers NatsConsumersConfiguration `yaml:"consumers,omitempty"`
Comment thread
dkorittki marked this conversation as resolved.
}

func (n NatsEventSource) GetID() string {
Expand Down
12 changes: 12 additions & 0 deletions router/pkg/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2580,6 +2580,18 @@
}
}
]
},
"consumers": {
Comment thread
dkorittki marked this conversation as resolved.
"type": "object",
"description": "Configuration for durable JetStream consumers managed by this NATS provider.",
"additionalProperties": false,
"properties": {
"delete_on_shutdown": {
"type": "boolean",
"description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down. This prevents zombie consumers accumulating on the NATS server across Kubernetes rolling deployments where each pod has a unique hostname baked into the consumer name. Defaults to false.",
"default": false
}
}
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion router/pkg/config/testdata/config_full.json
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,10 @@
{
"ID": "default",
"URL": "nats://localhost:4222",
"Authentication": null
"Authentication": null,
"Consumers": {
"DeleteOnShutdown": false
}
},
{
"ID": "my-nats",
Expand All @@ -667,6 +670,9 @@
"Username": "admin"
},
"Token": null
},
"Consumers": {
"DeleteOnShutdown": false
}
}
],
Expand Down
44 changes: 43 additions & 1 deletion router/pkg/pubsub/nats/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ type Adapter interface {
// Ensure ProviderAdapter implements ProviderSubscriptionHooks
var _ datasource.Adapter = (*ProviderAdapter)(nil)

type consumerConfig struct {
deleteOnShutdown bool
trackedConsumers sync.Map // used as a set, key = consumer name, value = stream name
}

// ProviderAdapter implements the AdapterInterface for NATS pub/sub
type ProviderAdapter struct {
ctx context.Context
Expand All @@ -46,6 +51,7 @@ type ProviderAdapter struct {
opts []nats.Option
flushTimeout time.Duration
streamMetricStore metric.StreamMetricStore
consumerConfig consumerConfig
}

// getInstanceIdentifier returns an identifier for the current instance.
Expand Down Expand Up @@ -116,6 +122,11 @@ func (p *ProviderAdapter) Subscribe(ctx context.Context, cfg datasource.Subscrip
return datasource.NewError(fmt.Sprintf(`failed to create or update consumer for stream "%s"`, subConf.StreamConfiguration.StreamName), err)
}

// Track newly created durable consumer so we can later delete them, if it's necessary.
if p.consumerConfig.deleteOnShutdown {
p.consumerConfig.trackedConsumers.Store(durableConsumerName, subConf.StreamConfiguration.StreamName)
}

p.closeWg.Add(1)

go func() {
Expand Down Expand Up @@ -383,6 +394,34 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error {

var shutdownErr error

// Delete durable consumers before closing the connection.
if p.consumerConfig.deleteOnShutdown && p.js != nil {
p.consumerConfig.trackedConsumers.Range(func(key, value any) bool {
consumerName, ok := key.(string)
if !ok {
// skip this odd element, should not happen in reality
return true
}

streamName, ok := value.(string)
if !ok {
// skip this odd element, should not happen in reality
return true
}

err := p.js.DeleteConsumer(ctx, streamName, consumerName)
if err != nil {
p.logger.Warn("failed to delete durable consumer on shutdown",
zap.String("stream", streamName),
zap.String("consumer", consumerName),
zap.Error(err),
)
}

return true
})
}

fErr := p.flush(ctx)
if fErr != nil {
shutdownErr = errors.Join(shutdownErr, fErr)
Expand All @@ -407,7 +446,7 @@ func (p *ProviderAdapter) Shutdown(ctx context.Context) error {
return nil
}

func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats.Option, hostName string, routerListenAddr string, providerOpts datasource.ProviderOpts) (Adapter, error) {
func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats.Option, hostName string, routerListenAddr string, deleteConsumersOnShutdown bool, providerOpts datasource.ProviderOpts) (Adapter, error) {
if logger == nil {
logger = zap.NewNop()
}
Expand All @@ -432,6 +471,9 @@ func NewAdapter(ctx context.Context, logger *zap.Logger, url string, opts []nats
opts: opts,
flushTimeout: 10 * time.Second,
streamMetricStore: store,
consumerConfig: consumerConfig{
deleteOnShutdown: deleteConsumersOnShutdown,
},
}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion router/pkg/pubsub/nats/provider_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func buildProvider(ctx context.Context, provider config.NatsEventSource, logger
return nil, fmt.Errorf("failed to build options for Nats provider with ID \"%s\": %w", provider.ID, err)
}

adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, providerOpts)
adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, provider.DeleteDurableConsumersOnShutdown, providerOpts)
if err != nil {
return nil, fmt.Errorf("failed to create adapter for Nats provider with ID \"%s\": %w", provider.ID, err)
}
Expand Down
Loading