From 849e728fd0d8c9518611e38251cea84f2f4ff900 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Fri, 20 Oct 2023 15:07:30 -0400 Subject: [PATCH 1/2] enhancement(nats source): add subscriber_capacity option --- src/sources/nats.rs | 17 +++++++++++++++++ .../reference/components/sources/base/nats.cue | 14 ++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/src/sources/nats.rs b/src/sources/nats.rs index a6fb139759fef..645c7bcefd808 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -97,12 +97,28 @@ pub struct NatsSourceConfig { /// The `NATS` subject key. #[serde(default = "default_subject_key_field")] subject_key_field: OptionalValuePath, + + /// The buffer capacity of the underlying NATS subscriber. + /// + /// This value determines how many messages the NATS subscriber will buffer + /// before incoming messages are dropped. + /// + /// See the [async_nats documentation][async_nats_subscription_capacity] for more information. + /// + /// [async_nats_subscription_capacity]: https://docs.rs/async-nats/latest/async_nats/struct.ConnectOptions.html#method.subscription_capacity + #[serde(default = "default_subscription_capacity")] + #[derivative(Default(value = "default_subscription_capacity()"))] + subscriber_capacity: usize, } fn default_subject_key_field() -> OptionalValuePath { OptionalValuePath::from(owned_value_path!("subject")) } +fn default_subscription_capacity() -> usize { + 4096 +} + impl GenerateConfig for NatsSourceConfig { fn generate_config() -> toml::Value { toml::from_str( @@ -178,6 +194,7 @@ impl TryFrom<&NatsSourceConfig> for async_nats::ConnectOptions { fn try_from(config: &NatsSourceConfig) -> Result { from_tls_auth_config(&config.connection_name, &config.auth, &config.tls) + .map(|options| options.subscription_capacity(config.subscriber_capacity)) } } diff --git a/website/cue/reference/components/sources/base/nats.cue b/website/cue/reference/components/sources/base/nats.cue index 81bc4bdca5b86..c2c8013a7f130 100644 --- a/website/cue/reference/components/sources/base/nats.cue +++ b/website/cue/reference/components/sources/base/nats.cue @@ -345,6 +345,20 @@ base: components: sources: nats: configuration: { required: false type: string: default: "subject" } + subscriber_capacity: { + description: """ + The buffer capacity of the underlying NATS subscriber. + + This value determines how many messages the NATS subscriber will buffer + before incoming messages are dropped. + + See the [async_nats documentation][async_nats_subscription_capacity] for more information. + + [async_nats_subscription_capacity]: https://docs.rs/async-nats/latest/async_nats/struct.ConnectOptions.html#method.subscription_capacity + """ + required: false + type: uint: default: 4096 + } tls: { description: "Configures the TLS options for incoming/outgoing connections." required: false From f9e4dbf9a6a6e7cdf64eca8c17f76e2c4da9d223 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Fri, 20 Oct 2023 15:51:25 -0400 Subject: [PATCH 2/2] feedback and fix tests --- src/sources/nats.rs | 17 +++++++++++++++-- .../reference/components/sources/base/nats.cue | 2 +- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/sources/nats.rs b/src/sources/nats.rs index 645c7bcefd808..f3023849ef562 100644 --- a/src/sources/nats.rs +++ b/src/sources/nats.rs @@ -100,7 +100,7 @@ pub struct NatsSourceConfig { /// The buffer capacity of the underlying NATS subscriber. /// - /// This value determines how many messages the NATS subscriber will buffer + /// This value determines how many messages the NATS subscriber buffers /// before incoming messages are dropped. /// /// See the [async_nats documentation][async_nats_subscription_capacity] for more information. @@ -115,7 +115,7 @@ fn default_subject_key_field() -> OptionalValuePath { OptionalValuePath::from(owned_value_path!("subject")) } -fn default_subscription_capacity() -> usize { +const fn default_subscription_capacity() -> usize { 4096 } @@ -432,6 +432,7 @@ mod integration_tests { auth: None, log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -464,6 +465,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -496,6 +498,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -527,6 +530,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -558,6 +562,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -590,6 +595,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -622,6 +628,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -655,6 +662,7 @@ mod integration_tests { auth: None, log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -682,6 +690,7 @@ mod integration_tests { auth: None, log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -717,6 +726,7 @@ mod integration_tests { auth: None, log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -750,6 +760,7 @@ mod integration_tests { auth: None, log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -787,6 +798,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; @@ -824,6 +836,7 @@ mod integration_tests { }), log_namespace: None, subject_key_field: default_subject_key_field(), + ..Default::default() }; let r = publish_and_check(conf).await; diff --git a/website/cue/reference/components/sources/base/nats.cue b/website/cue/reference/components/sources/base/nats.cue index c2c8013a7f130..21d814c94df9f 100644 --- a/website/cue/reference/components/sources/base/nats.cue +++ b/website/cue/reference/components/sources/base/nats.cue @@ -349,7 +349,7 @@ base: components: sources: nats: configuration: { description: """ The buffer capacity of the underlying NATS subscriber. - This value determines how many messages the NATS subscriber will buffer + This value determines how many messages the NATS subscriber buffers before incoming messages are dropped. See the [async_nats documentation][async_nats_subscription_capacity] for more information.