Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions router-tests/events/nats_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ func TestNatsEvents(t *testing.T) {
},
ModifyEventsConfiguration: func(cfg *config.EventsConfiguration) {
for i := range cfg.Providers.Nats {
cfg.Providers.Nats[i].Consumers.Durable.DeleteOnShutdown = true
cfg.Providers.Nats[i].DeleteDurableConsumersOnShutdown = true
}
},
})
Expand Down Expand Up @@ -1270,7 +1270,7 @@ func TestNatsEvents(t *testing.T) {
},
ModifyEventsConfiguration: func(cfg *config.EventsConfiguration) {
for i := range cfg.Providers.Nats {
cfg.Providers.Nats[i].Consumers.Durable.DeleteOnShutdown = false
cfg.Providers.Nats[i].DeleteDurableConsumersOnShutdown = false
}
},
})
Expand Down
16 changes: 4 additions & 12 deletions router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,19 +616,11 @@ type NatsAuthentication struct {
NatsTokenBasedAuthentication `yaml:"token,inline"`
}

type NatsDurableConsumersConfiguration struct {
DeleteOnShutdown bool `yaml:"delete_on_shutdown" envDefault:"false"`
}

type NatsConsumersConfiguration struct {
Durable NatsDurableConsumersConfiguration `yaml:"durable,omitempty"`
}

type NatsEventSource struct {
ID string `yaml:"id,omitempty"`
URL string `yaml:"url,omitempty"`
Authentication *NatsAuthentication `yaml:"authentication,omitempty"`
Consumers NatsConsumersConfiguration `yaml:"consumers,omitempty"`
ID string `yaml:"id,omitempty"`
URL string `yaml:"url,omitempty"`
Authentication *NatsAuthentication `yaml:"authentication,omitempty"`
DeleteDurableConsumersOnShutdown bool `yaml:"experiment_delete_durable_consumers_on_shutdown" env:"EXPERIMENT_DELETE_DURABLE_CONSUMERS_ON_SHUTDOWN" envDefault:"false"`
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

func (n NatsEventSource) GetID() string {
Expand Down
24 changes: 5 additions & 19 deletions router/pkg/config/config.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"properties": {
"token": {
"type": "string",
"description": "The token used to authenticate with other component from Cosmo. Can be ommitted if the router is started with a static execution config."
"description": "The token used to authenticate with other component from Cosmo. Can be omitted if the router is started with a static execution config."
},
"sign_key": {
"type": "string",
Expand Down Expand Up @@ -2581,24 +2581,10 @@
}
]
},
"consumers": {
"type": "object",
"description": "Configuration for JetStream consumers managed by this NATS provider.",
"additionalProperties": false,
"properties": {
"durable": {
"type": "object",
"description": "Configuration for durable JetStream consumers managed by this NATS provider.",
"additionalProperties": false,
"properties": {
"delete_on_shutdown": {
"type": "boolean",
"description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down normally. Defaults to false.",
"default": false
}
}
}
}
"experiment_delete_durable_consumers_on_shutdown": {
"type": "boolean",
"description": "When enabled, all durable JetStream consumers created by this provider are deleted when the router shuts down normally. Defaults to false. NOTE: This option is experimental and may change in future versions.",
"default": false
}
}
}
Expand Down
12 changes: 2 additions & 10 deletions router/pkg/config/testdata/config_full.json
Original file line number Diff line number Diff line change
Expand Up @@ -657,11 +657,7 @@
"ID": "default",
"URL": "nats://localhost:4222",
"Authentication": null,
"Consumers": {
"Durable": {
"DeleteOnShutdown": false
}
}
"DeleteDurableConsumersOnShutdown": false
},
{
"ID": "my-nats",
Expand All @@ -673,11 +669,7 @@
},
"Token": null
},
"Consumers": {
"Durable": {
"DeleteOnShutdown": false
}
}
"DeleteDurableConsumersOnShutdown": false
}
],
"Kafka": [
Expand Down
2 changes: 1 addition & 1 deletion router/pkg/pubsub/nats/provider_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func buildProvider(ctx context.Context, provider config.NatsEventSource, logger
return nil, fmt.Errorf("failed to build options for Nats provider with ID \"%s\": %w", provider.ID, err)
}

adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, provider.Consumers.Durable.DeleteOnShutdown, providerOpts)
adapter, err := NewAdapter(ctx, logger, provider.URL, options, hostName, routerListenAddr, provider.DeleteDurableConsumersOnShutdown, providerOpts)
if err != nil {
return nil, fmt.Errorf("failed to create adapter for Nats provider with ID \"%s\": %w", provider.ID, err)
}
Expand Down
Loading