feat: option to remove nats consumers on shutdown#2600
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughAdds per-provider NATS consumer configuration and optional deletion of durable JetStream consumers on shutdown; extends adapter API with a delete-on-shutdown boolean, tracks durable consumers, deletes them during Shutdown when enabled, and updates call sites, config/schema, test fixtures, and tests. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Tip Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs). Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Router-nonroot image scan passed✅ No security vulnerabilities found in image: |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
router/pkg/config/config.go (1)
619-624: Consider adding anenvtag for environment variable configuration.The new
DeleteDurableConsumersOnShutdownfield hasenvDefault:"false"but noenvtag, which means it cannot be configured via environment variables. If this feature should be configurable through environment variables (common for operational settings), consider adding an env tag.💡 Optional: Add env tag for environment variable support
type NatsEventSource struct { ID string `yaml:"id,omitempty"` URL string `yaml:"url,omitempty"` Authentication *NatsAuthentication `yaml:"authentication,omitempty"` - DeleteDurableConsumersOnShutdown bool `yaml:"delete_durable_consumers_on_shutdown" envDefault:"false"` + DeleteDurableConsumersOnShutdown bool `yaml:"delete_durable_consumers_on_shutdown" envDefault:"false" env:"NATS_DELETE_DURABLE_CONSUMERS_ON_SHUTDOWN"` }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router/pkg/config/config.go` around lines 619 - 624, The struct NatsEventSource's field DeleteDurableConsumersOnShutdown has envDefault:"false" but no env tag, so it can't be set from environment; update the NatsEventSource struct definition to add an env tag (e.g., env:"DELETE_DURABLE_CONSUMERS_ON_SHUTDOWN") to the DeleteDurableConsumersOnShutdown field so the env parsing library can map an environment variable to this boolean, keeping the existing yaml tag and envDefault intact; ensure the chosen env name follows project conventions and update any docs if needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@router/pkg/pubsub/nats/adapter.go`:
- Around line 55-57: The trackedConsumers slice should be replaced with a set to
avoid duplicate durable consumer entries: change the field trackedConsumers
[]trackedConsumer to trackedConsumers map[trackedConsumer]struct{} and
initialize it (e.g., in NewAdapter) and update any append sites (where
getDurableConsumerName() is used to identify the consumer) to insert into the
map (trackedConsumers[tc] = struct{}{}) under trackedConsumersMu instead of
appending; also update shutdown logic to iterate map keys to delete unique
consumers and clear the map under the same mutex.
- Around line 404-419: When p.deleteConsumersOnShutdown is true and p.js != nil
the code currently only logs DeleteConsumer failures; instead collect/aggregate
any errors from p.js.DeleteConsumer (while still logging), and return a non-nil
error from Shutdown() so callers see the failure; use the existing
p.trackedConsumersMu, p.trackedConsumers and the loop over tc to capture the
first (or combined) error from DeleteConsumer calls and return it from
Shutdown() (preserving the current zap.Warn logs but also propagating the error
out of Shutdown()).
---
Nitpick comments:
In `@router/pkg/config/config.go`:
- Around line 619-624: The struct NatsEventSource's field
DeleteDurableConsumersOnShutdown has envDefault:"false" but no env tag, so it
can't be set from environment; update the NatsEventSource struct definition to
add an env tag (e.g., env:"DELETE_DURABLE_CONSUMERS_ON_SHUTDOWN") to the
DeleteDurableConsumersOnShutdown field so the env parsing library can map an
environment variable to this boolean, keeping the existing yaml tag and
envDefault intact; ensure the chosen env name follows project conventions and
update any docs if needed.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 207daf15-64fa-4d2b-b856-50c437454f19
📒 Files selected for processing (7)
demo/pkg/subgraphs/subgraphs.gorouter-tests/testenv/testenv.gorouter/pkg/config/config.gorouter/pkg/config/config.schema.jsonrouter/pkg/config/testdata/config_full.jsonrouter/pkg/pubsub/nats/adapter.gorouter/pkg/pubsub/nats/provider_builder.go
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2600 +/- ##
===========================================
+ Coverage 41.42% 62.13% +20.71%
===========================================
Files 799 244 -555
Lines 118919 25809 -93110
Branches 9618 0 -9618
===========================================
- Hits 49259 16036 -33223
+ Misses 69298 8395 -60903
- Partials 362 1378 +1016
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
router/pkg/config/testdata/config_full.json (1)
659-676:⚠️ Potential issue | 🟡 MinorThis snapshot won't catch a broken
consumersmapping.Both providers assert
DeleteOnShutdown: false, butNatsEventSource.Consumersinrouter/pkg/config/config.go:623-627is a value field. If the source fixture omitsconsumersor it is wired to the wrong YAML path,TestLoadFullConfigwould still serialize to the same JSON. Set one provider totrueso the snapshot actually verifies the new field end to end.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router/pkg/config/testdata/config_full.json` around lines 659 - 676, The snapshot is insensitive to a missing/miswired consumers mapping because both providers in testdata/config_full.json have Consumers.DeleteOnShutdown:false; update the fixture so one provider (e.g., the NATS provider with ID "my-nats") sets Consumers.DeleteOnShutdown:true to exercise the mapping end-to-end and ensure NatsEventSource.Consumers (in router/pkg/config/config.go) is actually pulled into TestLoadFullConfig's serialization; confirm the JSON/YAML key matches the expected "consumers" path/casing so the test will fail if the field is omitted or miswired.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@router/pkg/config/testdata/config_full.json`:
- Around line 659-676: The snapshot is insensitive to a missing/miswired
consumers mapping because both providers in testdata/config_full.json have
Consumers.DeleteOnShutdown:false; update the fixture so one provider (e.g., the
NATS provider with ID "my-nats") sets Consumers.DeleteOnShutdown:true to
exercise the mapping end-to-end and ensure NatsEventSource.Consumers (in
router/pkg/config/config.go) is actually pulled into TestLoadFullConfig's
serialization; confirm the JSON/YAML key matches the expected "consumers"
path/casing so the test will fail if the field is omitted or miswired.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2a34b0f4-15d1-4890-bd01-772d1cafaf93
📒 Files selected for processing (4)
router/pkg/config/config.gorouter/pkg/config/config.schema.jsonrouter/pkg/config/testdata/config_full.jsonrouter/pkg/pubsub/nats/adapter.go
🚧 Files skipped from review as they are similar to previous changes (1)
- router/pkg/pubsub/nats/adapter.go
There was a problem hiding this comment.
♻️ Duplicate comments (1)
router/pkg/pubsub/nats/adapter.go (1)
381-383:⚠️ Potential issue | 🟠 MajorConsumer deletion errors are not propagated to callers.
deleteDurableConsumersonly logs failures but doesn't return errors. When this flag is enabled, failedDeleteConsumercalls represent the exact failure mode callers need to see - a consumer leak. Currently,Shutdown()will return success even when consumers fail to delete.Consider returning errors from
deleteDurableConsumersand joining them withshutdownErr:Suggested approach
-func (p *ProviderAdapter) deleteDurableConsumers(ctx context.Context) { +func (p *ProviderAdapter) deleteDurableConsumers(ctx context.Context) error { if p.js == nil { - return + return nil } + var errs error p.consumerConfig.trackedConsumers.Range(func(key, value any) bool { consumerName, ok := key.(string) if !ok { return true } streamName, ok := value.(string) if !ok { return true } err := p.js.DeleteConsumer(ctx, streamName, consumerName) if err != nil { p.logger.Warn("failed to delete durable consumer on shutdown", zap.String("stream", streamName), zap.String("consumer", consumerName), zap.Error(err), ) + errs = errors.Join(errs, fmt.Errorf("delete consumer %s/%s: %w", streamName, consumerName, err)) } return true }) + return errs }Then in
Shutdown:if p.consumerConfig.deleteOnShutdown { - p.deleteDurableConsumers(ctx) + if err := p.deleteDurableConsumers(ctx); err != nil { + shutdownErr = errors.Join(shutdownErr, err) + } }Also applies to: 444-474
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router/pkg/pubsub/nats/adapter.go` around lines 381 - 383, The deleteDurableConsumers helper currently only logs errors, so Shutdown returns success even when DeleteConsumer fails; change deleteDurableConsumers to return an error (aggregate/multierror) and update Shutdown to call it when p.consumerConfig.deleteOnShutdown is true, capture its returned error and join/append it to shutdownErr (preserving existing shutdown error behavior) so caller sees consumer deletion failures; reference functions: deleteDurableConsumers, Shutdown, DeleteConsumer, shutdownErr, and field consumerConfig.deleteOnShutdown.
🧹 Nitpick comments (1)
router/pkg/pubsub/nats/adapter.go (1)
414-416: Consider returning an error instead ofnil, nilwhen JetStream is unavailable.Returning
nil, nilwhenp.js == nilcould lead to nil pointer dereferences if this method is called from new code paths that don't pre-check. WhileSubscribecurrently guards against this (lines 101-103), an explicit error is safer and more self-documenting.Suggested fix
if p.js == nil { - return nil, nil + return nil, errors.New("jetstream not initialized") }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router/pkg/pubsub/nats/adapter.go` around lines 414 - 416, The method currently returns nil, nil when p.js == nil which hides the JetStream-unavailable condition; change the return to an explicit error (e.g., return nil, errors.New("jetstream unavailable") or fmt.Errorf(...)) so callers receive a non-nil error instead of a silent nil subscription, and update any direct callers if needed; refer to p.js in adapter.go and ensure behavior aligns with Subscribe (which already checks p.js) so downstream code can handle the error safely.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@router/pkg/pubsub/nats/adapter.go`:
- Around line 381-383: The deleteDurableConsumers helper currently only logs
errors, so Shutdown returns success even when DeleteConsumer fails; change
deleteDurableConsumers to return an error (aggregate/multierror) and update
Shutdown to call it when p.consumerConfig.deleteOnShutdown is true, capture its
returned error and join/append it to shutdownErr (preserving existing shutdown
error behavior) so caller sees consumer deletion failures; reference functions:
deleteDurableConsumers, Shutdown, DeleteConsumer, shutdownErr, and field
consumerConfig.deleteOnShutdown.
---
Nitpick comments:
In `@router/pkg/pubsub/nats/adapter.go`:
- Around line 414-416: The method currently returns nil, nil when p.js == nil
which hides the JetStream-unavailable condition; change the return to an
explicit error (e.g., return nil, errors.New("jetstream unavailable") or
fmt.Errorf(...)) so callers receive a non-nil error instead of a silent nil
subscription, and update any direct callers if needed; refer to p.js in
adapter.go and ensure behavior aligns with Subscribe (which already checks p.js)
so downstream code can handle the error safely.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 190815ef-5976-465c-bf61-71ee4f596288
📒 Files selected for processing (3)
router/pkg/config/config.schema.jsonrouter/pkg/pubsub/nats/adapter.gorouter/pkg/pubsub/nats/provider_builder.go
🚧 Files skipped from review as they are similar to previous changes (2)
- router/pkg/config/config.schema.json
- router/pkg/pubsub/nats/provider_builder.go
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@router-tests/events/nats_events_test.go`:
- Around line 1209-1213: The test is masking a real bug by nil-ing
routerConfig.FeatureFlagConfigs; instead restore realistic feature flags in the
test and fix buildGraphMux so it does not overwrite s.pubSubProviders on each
call. Concretely, remove the workaround in ModifyRouterConfig (stop setting
FeatureFlagConfigs = nil) and change the implementation of buildGraphMux to
merge/init pub-sub providers instead of replacing s.pubSubProviders (ensure any
nil checks or initialization happen once and that existing providers are
preserved), or add an additional test that leaves FeatureFlagConfigs populated
and asserts s.pubSubProviders are not orphaned across successive buildGraphMux
invocations.
- Around line 1241-1257: Replace the single-shot assertions on JetStream
consumer counts with eventual checks to avoid races: after
env.WaitForSubscriptionCount(1) poll stream.Info via
js.Stream(...)/stream.Info(...) inside require.Eventually until
streamInfo.State.Consumers == 1, and after env.Shutdown use a fresh ctx and
again require.Eventually polling stream.Info until streamInfo.State.Consumers ==
0; use the same js.Stream/stream.Info calls and timeouts (e.g., NatsWaitTimeout)
so the test waits for JetStream to converge instead of asserting once.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e7c6a2f6-11cb-42ee-8f55-145b0543d907
📒 Files selected for processing (1)
router-tests/events/nats_events_test.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
router-tests/testenv/testenv.go (1)
2928-2935: Avoid baking the old shutdown behavior into the test helper.This helper now always creates subgraph-side NATS adapters with
deleteOnShutdown=false, so its cleanup path no longer matches the new production lifecycle. Consider defaulting testenv totruehere, or plumbing the flag throughtestenv.Config, so tests can opt into the same shutdown semantics they are exercising elsewhere.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@router-tests/testenv/testenv.go` around lines 2928 - 2935, The test helper currently constructs the NATS adapter with deleteOnShutdown hardcoded as false in the pubsubNats.NewAdapter call (see the adapter variable creation), which embeds old shutdown behavior into tests; change this to use the test environment config instead — either default the flag to true in the test helper or add a field to testenv.Config (e.g., DeleteOnShutdown or NATSDeleteOnShutdown) and pass that value into pubsubNats.NewAdapter instead of the literal false so tests can opt into the production shutdown semantics; update adapter.Startup/Shutdown cleanup usage accordingly to honor the configured flag.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@router-tests/testenv/testenv.go`:
- Around line 2928-2935: The test helper currently constructs the NATS adapter
with deleteOnShutdown hardcoded as false in the pubsubNats.NewAdapter call (see
the adapter variable creation), which embeds old shutdown behavior into tests;
change this to use the test environment config instead — either default the flag
to true in the test helper or add a field to testenv.Config (e.g.,
DeleteOnShutdown or NATSDeleteOnShutdown) and pass that value into
pubsubNats.NewAdapter instead of the literal false so tests can opt into the
production shutdown semantics; update adapter.Startup/Shutdown cleanup usage
accordingly to honor the configured flag.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 00f8e41d-d384-408b-937f-4ff4eecd833b
📒 Files selected for processing (2)
router-tests/testenv/testenv.gorouter/pkg/config/config.go
Summary by CodeRabbit
New Features
Configuration
Tests
Checklist
Adds a new configuration option to nats providers
to the router. This config tells the router to clean up all durable consumers on the nats server, which the router has created, when the router shuts down. The default is not to clean up to let the router stay identical to previous behaviour.
Usually there is an option for specifying the max age for an inactive consumers on the NATS server but there are users right now having issues with this. We need to investigate into that. Meanwhile this option provides an alternative to that.