Skip to content
2 changes: 1 addition & 1 deletion src/sinks/nats/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl NatsSink {

async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let request = self.request.unwrap_with(&TowerRequestConfig {
concurrency: Concurrency::Fixed(1),
concurrency: Some(Concurrency::Fixed(1)),
..Default::default()
});

Expand Down
2 changes: 1 addition & 1 deletion src/sinks/redis/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl RedisSink {

async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let request = self.request.unwrap_with(&TowerRequestConfig {
concurrency: Concurrency::Fixed(1),
Comment thread
lukesteensen marked this conversation as resolved.
concurrency: Some(Concurrency::Fixed(1)),
..Default::default()
});

Expand Down
4 changes: 2 additions & 2 deletions src/sinks/util/adaptive_concurrency/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<L> Controller<L> {
// If a `concurrency` is specified, it becomes both the
// current limit and the maximum, effectively bypassing all the
// mechanisms. Otherwise, the current limit is set to 1 and the
// maximum to MAX_CONCURRENCY.
// maximum to `settings.max_concurrency_limit`.
let current_limit = concurrency.unwrap_or(settings.initial_concurrency);
Self {
semaphore: Arc::new(ShrinkableSemaphore::new(current_limit)),
Expand Down Expand Up @@ -226,7 +226,7 @@ impl<L> Controller<L> {
// concurrency limit. Note that we only check this if we had
// requests to go beyond the current limit to prevent
// increasing the limit beyond what we have evidence for.
if inner.current_limit < super::MAX_CONCURRENCY
if inner.current_limit < self.settings.max_concurrency_limit
&& inner.reached_limit
&& !inner.had_back_pressure
&& current_rtt.is_some()
Expand Down
20 changes: 11 additions & 9 deletions src/sinks/util/adaptive_concurrency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ mod service;
#[cfg(test)]
pub mod tests;

// Make sure to update the max range of the `AdaptiveConcurrencySettings::initial_concurrency` when changing
// this constant.
pub(super) const MAX_CONCURRENCY: usize = 200;

pub(crate) use layer::AdaptiveConcurrencyLimitLayer;
pub(crate) use service::AdaptiveConcurrencyLimit;
use vector_lib::configurable::configurable_component;
Expand All @@ -36,7 +32,7 @@ pub struct AdaptiveConcurrencySettings {
/// It is recommended to set this value to your service's average limit if you're seeing that it takes a
/// long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the
/// `adaptive_concurrency_limit` metric.
#[configurable(validation(range(min = 1, max = 200)))]
#[configurable(validation(range(min = 1)))]
#[serde(default = "default_initial_concurrency")]
pub(super) initial_concurrency: usize,

Expand Down Expand Up @@ -72,6 +68,13 @@ pub struct AdaptiveConcurrencySettings {
#[configurable(validation(range(min = 0.0)))]
#[serde(default = "default_rtt_deviation_scale")]
pub(super) rtt_deviation_scale: f64,

/// The maximum concurrency limit.
///
/// The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard.
#[configurable(validation(range(min = 1)))]
#[serde(default = "default_max_concurrency_limit")]
pub(super) max_concurrency_limit: usize,
}

const fn default_initial_concurrency() -> usize {
Expand All @@ -90,10 +93,8 @@ const fn default_rtt_deviation_scale() -> f64 {
2.5
}

impl AdaptiveConcurrencySettings {
pub const fn max_concurrency() -> usize {
MAX_CONCURRENCY
}
const fn default_max_concurrency_limit() -> usize {
200
}

impl Default for AdaptiveConcurrencySettings {
Expand All @@ -103,6 +104,7 @@ impl Default for AdaptiveConcurrencySettings {
decrease_ratio: default_decrease_ratio(),
ewma_alpha: default_ewma_alpha(),
rtt_deviation_scale: default_rtt_deviation_scale(),
max_concurrency_limit: default_max_concurrency_limit(),
}
}
}
2 changes: 1 addition & 1 deletion src/sinks/util/adaptive_concurrency/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ async fn run_test(params: TestParams) -> TestResults {

let test_config = TestConfig {
request: TowerRequestConfig {
concurrency: params.concurrency,
concurrency: Some(params.concurrency),
rate_limit_num: Some(9999),
timeout_secs: Some(1),
..Default::default()
Expand Down
Loading