diff --git a/src/components/validation/runner/config.rs b/src/components/validation/runner/config.rs index c087fd8bf809a..f80746ab21df1 100644 --- a/src/components/validation/runner/config.rs +++ b/src/components/validation/runner/config.rs @@ -141,7 +141,7 @@ fn build_output_edge() -> (OutputEdge, impl Into) { // we don't want to waste time performing retries, especially when the test // harness is shutting down. output_sink.batch.timeout_secs = Some(0.1); - output_sink.request.retry_attempts = Some(0); + output_sink.request.retry_attempts = 0; let output_edge = OutputEdge::from_address(output_listen_addr); diff --git a/src/components/validation/runner/telemetry.rs b/src/components/validation/runner/telemetry.rs index d5bb8f5b438d3..c415bfb05d012 100644 --- a/src/components/validation/runner/telemetry.rs +++ b/src/components/validation/runner/telemetry.rs @@ -52,7 +52,7 @@ impl Telemetry { // disable retries, as we don't want to waste time performing retries, // especially when the test harness is shutting down. vector_sink.batch.timeout_secs = Some(0.1); - vector_sink.request.retry_attempts = Some(0); + vector_sink.request.retry_attempts = 0; config_builder.add_source(INTERNAL_LOGS_KEY, internal_logs); config_builder.add_source(INTERNAL_METRICS_KEY, internal_metrics); diff --git a/src/sinks/appsignal/config.rs b/src/sinks/appsignal/config.rs index 2fc0bdb4907b5..137757f3a7fc8 100644 --- a/src/sinks/appsignal/config.rs +++ b/src/sinks/appsignal/config.rs @@ -102,7 +102,7 @@ impl AppsignalConfig { let service = AppsignalService::new(http_client, endpoint, push_api_key, compression); let request_opts = self.request; - let request_settings = request_opts.unwrap_with(&TowerRequestConfig::default()); + let request_settings = request_opts.into_settings(); let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status); let service = ServiceBuilder::new() diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index 8b8b94662376c..18e9201421c6f 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -25,7 +25,6 @@ use crate::{ }, util::{ http::RequestConfig, BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, - TowerRequestConfig, }, Healthcheck, VectorSink, }, @@ -211,10 +210,7 @@ impl CloudwatchLogsSinkConfig { impl SinkConfig for CloudwatchLogsSinkConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let batcher_settings = self.batch.into_batcher_settings()?; - let request_settings = self - .request - .tower - .unwrap_with(&TowerRequestConfig::default()); + let request_settings = self.request.tower.into_settings(); let client = self.create_client(cx.proxy()).await?; let smithy_client = self.create_smithy_client(cx.proxy()).await?; let svc = ServiceBuilder::new() diff --git a/src/sinks/aws_cloudwatch_logs/service.rs b/src/sinks/aws_cloudwatch_logs/service.rs index 1278a6e9f63f1..bedfc0d3574cb 100644 --- a/src/sinks/aws_cloudwatch_logs/service.rs +++ b/src/sinks/aws_cloudwatch_logs/service.rs @@ -34,9 +34,7 @@ use crate::sinks::{ config::CloudwatchLogsSinkConfig, config::Retention, request, retry::CloudwatchRetryLogic, sink::BatchCloudwatchRequest, CloudwatchKey, }, - util::{ - retries::FibonacciRetryPolicy, EncodedLength, TowerRequestConfig, TowerRequestSettings, - }, + util::{retries::FibonacciRetryPolicy, EncodedLength, TowerRequestSettings}, }; type Svc = Buffer< @@ -138,10 +136,7 @@ impl CloudwatchLogsPartitionSvc { // https://github.com/awslabs/aws-sdk-rust/issues/537 smithy_client: SmithyClient, ) -> Self { - let request_settings = config - .request - .tower - .unwrap_with(&TowerRequestConfig::default()); + let request_settings = config.request.tower.into_settings(); Self { config, diff --git a/src/sinks/aws_cloudwatch_metrics/mod.rs b/src/sinks/aws_cloudwatch_metrics/mod.rs index af81043a29bdb..8032aa41d0e5b 100644 --- a/src/sinks/aws_cloudwatch_metrics/mod.rs +++ b/src/sinks/aws_cloudwatch_metrics/mod.rs @@ -36,6 +36,8 @@ use crate::{ tls::TlsConfig, }; +use super::util::service::TowerRequestConfigDefaults; + #[derive(Clone, Copy, Debug, Default)] pub struct CloudWatchMetricsDefaultBatchSettings; @@ -45,6 +47,13 @@ impl SinkBatchSettings for CloudWatchMetricsDefaultBatchSettings { const TIMEOUT_SECS: f64 = 1.0; } +#[derive(Clone, Copy, Debug)] +pub struct CloudWatchMetricsTowerRequestConfigDefaults; + +impl TowerRequestConfigDefaults for CloudWatchMetricsTowerRequestConfigDefaults { + const RATE_LIMIT_NUM: u64 = 150; +} + /// Configuration for the `aws_cloudwatch_metrics` sink. #[configurable_component(sink( "aws_cloudwatch_metrics", @@ -79,7 +88,7 @@ pub struct CloudWatchMetricsSinkConfig { #[configurable(derived)] #[serde(default)] - pub request: TowerRequestConfig, + pub request: TowerRequestConfig, #[configurable(derived)] pub tls: Option, @@ -225,11 +234,7 @@ impl CloudWatchMetricsSvc { ) -> crate::Result { let default_namespace = config.default_namespace.clone(); let batch = config.batch.into_batch_settings()?; - let request_settings = config.request.unwrap_with( - &TowerRequestConfig::default() - .timeout_secs(30) - .rate_limit_num(150), - ); + let request_settings = config.request.into_settings(); let service = CloudWatchMetricsSvc { client }; let buffer = PartitionBuffer::new(MetricsBuffer::new(batch.size)); diff --git a/src/sinks/aws_kinesis/config.rs b/src/sinks/aws_kinesis/config.rs index 341063a0f2f19..d5356efc9fbbb 100644 --- a/src/sinks/aws_kinesis/config.rs +++ b/src/sinks/aws_kinesis/config.rs @@ -95,7 +95,7 @@ where E: Send + 'static, RT: RetryLogic + Default, { - let request_limits = config.request.unwrap_with(&TowerRequestConfig::default()); + let request_limits = config.request.into_settings(); let region = config.region.region(); let service = ServiceBuilder::new() diff --git a/src/sinks/aws_kinesis/firehose/integration_tests.rs b/src/sinks/aws_kinesis/firehose/integration_tests.rs index 002df851b1085..7ce6ab05beb2f 100644 --- a/src/sinks/aws_kinesis/firehose/integration_tests.rs +++ b/src/sinks/aws_kinesis/firehose/integration_tests.rs @@ -52,8 +52,8 @@ async fn firehose_put_records() { encoding: JsonSerializerConfig::default().into(), // required for ES destination w/ localstack compression: Compression::None, request: TowerRequestConfig { - timeout_secs: Some(10), - retry_attempts: Some(0), + timeout_secs: 10, + retry_attempts: 0, ..Default::default() }, tls: None, diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 4aad4417734a8..d68fc051b73cb 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -201,7 +201,7 @@ impl S3SinkConfig { // requests into in order to ship files to S3. We build this here in // order to configure the client/service with retries, concurrency // limits, rate limits, and whatever else the client should have. - let request_limits = self.request.unwrap_with(&Default::default()); + let request_limits = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_limits, S3RetryLogic) .service(service); diff --git a/src/sinks/aws_s_s/sink.rs b/src/sinks/aws_s_s/sink.rs index 41eb42f82e9f6..d933dcd752945 100644 --- a/src/sinks/aws_s_s/sink.rs +++ b/src/sinks/aws_s_s/sink.rs @@ -40,9 +40,7 @@ where } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let request = self - .request - .unwrap_with(&TowerRequestConfig::default().timeout_secs(30)); + let request = self.request.into_settings(); let retry_logic: SSRetryLogic = super::retry::SSRetryLogic::new(); let service = tower::ServiceBuilder::new() .settings(request, retry_logic) diff --git a/src/sinks/azure_blob/config.rs b/src/sinks/azure_blob/config.rs index 859af08823b4b..d429f88c1ad42 100644 --- a/src/sinks/azure_blob/config.rs +++ b/src/sinks/azure_blob/config.rs @@ -7,6 +7,7 @@ use vector_lib::configurable::configurable_component; use vector_lib::sensitive_string::SensitiveString; use super::request_builder::AzureBlobRequestOptions; +use crate::sinks::util::service::TowerRequestConfigDefaults; use crate::{ codecs::{Encoder, EncodingConfigWithFraming, SinkType}, config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, @@ -24,6 +25,13 @@ use crate::{ Result, }; +#[derive(Clone, Copy, Debug)] +pub struct AzureBlobTowerRequestConfigDefaults; + +impl TowerRequestConfigDefaults for AzureBlobTowerRequestConfigDefaults { + const RATE_LIMIT_NUM: u64 = 250; +} + /// Configuration for the `azure_blob` sink. #[configurable_component(sink( "azure_blob", @@ -131,7 +139,7 @@ pub struct AzureBlobSinkConfig { #[configurable(derived)] #[serde(default)] - pub request: TowerRequestConfig, + pub request: TowerRequestConfig, #[configurable(derived)] #[serde( @@ -202,9 +210,7 @@ const DEFAULT_FILENAME_APPEND_UUID: bool = true; impl AzureBlobSinkConfig { pub fn build_processor(&self, client: Arc) -> crate::Result { - let request_limits = self - .request - .unwrap_with(&TowerRequestConfig::default().rate_limit_num(250)); + let request_limits = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_limits, AzureBlobRetryLogic) .service(AzureBlobService::new(client)); diff --git a/src/sinks/azure_monitor_logs/config.rs b/src/sinks/azure_monitor_logs/config.rs index e772fe53f762b..258c6eacab267 100644 --- a/src/sinks/azure_monitor_logs/config.rs +++ b/src/sinks/azure_monitor_logs/config.rs @@ -185,7 +185,7 @@ impl AzureMonitorLogsConfig { let retry_logic = HttpStatusRetryLogic::new(|res: &AzureMonitorLogsResponse| res.http_status); - let request_settings = self.request.unwrap_with(&Default::default()); + let request_settings = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_settings, retry_logic) .service(service); diff --git a/src/sinks/clickhouse/config.rs b/src/sinks/clickhouse/config.rs index 0c8cc164633f8..312c1b26467f7 100644 --- a/src/sinks/clickhouse/config.rs +++ b/src/sinks/clickhouse/config.rs @@ -95,7 +95,7 @@ impl SinkConfig for ClickhouseConfig { self.date_time_best_effort, ); - let request_limits = self.request.unwrap_with(&Default::default()); + let request_limits = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_limits, ClickhouseRetryLogic::default()) .service(service); diff --git a/src/sinks/clickhouse/integration_tests.rs b/src/sinks/clickhouse/integration_tests.rs index 05ee0ee4e333d..79a6bc9c1985f 100644 --- a/src/sinks/clickhouse/integration_tests.rs +++ b/src/sinks/clickhouse/integration_tests.rs @@ -50,7 +50,7 @@ async fn insert_events() { compression: Compression::None, batch, request: TowerRequestConfig { - retry_attempts: Some(1), + retry_attempts: 1, ..Default::default() }, ..Default::default() @@ -100,7 +100,7 @@ async fn skip_unknown_fields() { compression: Compression::None, batch, request: TowerRequestConfig { - retry_attempts: Some(1), + retry_attempts: 1, ..Default::default() }, ..Default::default() @@ -146,7 +146,7 @@ async fn insert_events_unix_timestamps() { encoding: Transformer::new(None, None, Some(TimestampFormat::Unix)).unwrap(), batch, request: TowerRequestConfig { - retry_attempts: Some(1), + retry_attempts: 1, ..Default::default() }, ..Default::default() diff --git a/src/sinks/databend/config.rs b/src/sinks/databend/config.rs index 4a29b97f4a807..d9d4fbabfd5bd 100644 --- a/src/sinks/databend/config.rs +++ b/src/sinks/databend/config.rs @@ -116,7 +116,7 @@ impl SinkConfig for DatabendConfig { DatabendAPIClient::new(self.build_client(&cx)?, endpoint.clone(), auth.clone()); let healthcheck = select_one(health_client).boxed(); - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); + let request_settings = self.request.into_settings(); let batch_settings = self.batch.into_batcher_settings()?; let database = config.database; diff --git a/src/sinks/datadog/events/config.rs b/src/sinks/datadog/events/config.rs index 166881667db91..74f01c81ddbca 100644 --- a/src/sinks/datadog/events/config.rs +++ b/src/sinks/datadog/events/config.rs @@ -72,7 +72,7 @@ impl DatadogEventsConfig { ); let request_opts = self.request; - let request_settings = request_opts.unwrap_with(&TowerRequestConfig::default()); + let request_settings = request_opts.into_settings(); let retry_logic = HttpStatusRetryLogic::new(|req: &DatadogEventsResponse| req.http_status); let service = ServiceBuilder::new() diff --git a/src/sinks/datadog/logs/config.rs b/src/sinks/datadog/logs/config.rs index e69181c38e65e..3ed6a7938fed9 100644 --- a/src/sinks/datadog/logs/config.rs +++ b/src/sinks/datadog/logs/config.rs @@ -104,7 +104,7 @@ impl DatadogLogsConfig { dd_evp_origin: String, ) -> crate::Result { let default_api_key: Arc = Arc::from(self.dd_common.default_api_key.inner()); - let request_limits = self.request.tower.unwrap_with(&Default::default()); + let request_limits = self.request.tower.into_settings(); // We forcefully cap the provided batch configuration to the size/log line limits imposed by // the Datadog Logs API, but we still allow them to be lowered if need be. diff --git a/src/sinks/datadog/metrics/config.rs b/src/sinks/datadog/metrics/config.rs index b774beccc2001..a09536f4796e4 100644 --- a/src/sinks/datadog/metrics/config.rs +++ b/src/sinks/datadog/metrics/config.rs @@ -31,9 +31,6 @@ use crate::{ pub const MAXIMUM_PAYLOAD_COMPRESSED_SIZE: usize = 3_200_000; pub const MAXIMUM_PAYLOAD_SIZE: usize = 62_914_560; -// TODO: revisit our concurrency and batching defaults -const DEFAULT_REQUEST_RETRY_ATTEMPTS: usize = 5; - #[derive(Clone, Copy, Debug, Default)] pub struct DatadogMetricsDefaultBatchSettings; @@ -232,9 +229,7 @@ impl DatadogMetricsConfig { let batcher_settings = self.batch.into_batcher_settings()?; // TODO: revisit our concurrency and batching defaults - let request_limits = self.request.unwrap_with( - &TowerRequestConfig::default().retry_attempts(DEFAULT_REQUEST_RETRY_ATTEMPTS), - ); + let request_limits = self.request.into_settings(); let endpoint_configuration = self.generate_metrics_endpoint_configuration()?; let service = ServiceBuilder::new() diff --git a/src/sinks/datadog/traces/config.rs b/src/sinks/datadog/traces/config.rs index b3fe500ba2311..c28281b6ee3b3 100644 --- a/src/sinks/datadog/traces/config.rs +++ b/src/sinks/datadog/traces/config.rs @@ -41,9 +41,6 @@ pub const BATCH_DEFAULT_TIMEOUT_SECS: f64 = 10.0; pub const PAYLOAD_LIMIT: usize = 3_200_000; -const DEFAULT_REQUEST_RETRY_ATTEMPTS: usize = 5; -const DEFAULT_REQUEST_RETRY_MAX_DURATION_SECS: u64 = 300; - #[derive(Clone, Copy, Debug, Default)] pub struct DatadogTracesDefaultBatchSettings; @@ -130,11 +127,7 @@ impl DatadogTracesConfig { pub fn build_sink(&self, client: HttpClient) -> crate::Result { let default_api_key: Arc = Arc::from(self.dd_common.default_api_key.inner()); - let request_limits = self.request.unwrap_with( - &TowerRequestConfig::default() - .retry_attempts(DEFAULT_REQUEST_RETRY_ATTEMPTS) - .retry_max_duration_secs(DEFAULT_REQUEST_RETRY_MAX_DURATION_SECS), - ); + let request_limits = self.request.into_settings(); let endpoints = self.generate_traces_endpoint_configuration()?; let batcher_settings = self diff --git a/src/sinks/elasticsearch/common.rs b/src/sinks/elasticsearch/common.rs index 27b71e527dae1..04e01fd457ebf 100644 --- a/src/sinks/elasticsearch/common.rs +++ b/src/sinks/elasticsearch/common.rs @@ -19,7 +19,7 @@ use crate::{ ElasticsearchAuthConfig, ElasticsearchCommonMode, ElasticsearchConfig, ParseError, }, util::auth::Auth, - util::{http::RequestConfig, TowerRequestConfig, UriSerde}, + util::{http::RequestConfig, UriSerde}, HealthcheckError, }, tls::TlsSettings, @@ -89,10 +89,7 @@ impl ElasticsearchCommon { let mode = config.common_mode()?; - let tower_request = config - .request - .tower - .unwrap_with(&TowerRequestConfig::default()); + let tower_request = config.request.tower.into_settings(); let mut query_params = config.query.clone().unwrap_or_default(); query_params.insert( diff --git a/src/sinks/elasticsearch/config.rs b/src/sinks/elasticsearch/config.rs index 89b4095d52aaf..2a08bf42136f5 100644 --- a/src/sinks/elasticsearch/config.rs +++ b/src/sinks/elasticsearch/config.rs @@ -23,7 +23,7 @@ use crate::{ }, util::{ http::RequestConfig, service::HealthConfig, BatchConfig, Compression, - RealtimeSizeBasedDefaultBatchSettings, TowerRequestConfig, + RealtimeSizeBasedDefaultBatchSettings, }, Healthcheck, VectorSink, }, @@ -482,10 +482,7 @@ impl SinkConfig for ElasticsearchConfig { let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?; - let request_limits = self - .request - .tower - .unwrap_with(&TowerRequestConfig::default()); + let request_limits = self.request.tower.into_settings(); let health_config = self.endpoint_health.clone().unwrap_or_default(); diff --git a/src/sinks/gcp/chronicle_unstructured.rs b/src/sinks/gcp/chronicle_unstructured.rs index a0fe97cd9073b..bebc96b0d9a43 100644 --- a/src/sinks/gcp/chronicle_unstructured.rs +++ b/src/sinks/gcp/chronicle_unstructured.rs @@ -22,6 +22,7 @@ use vector_lib::{ }; use vrl::value::Kind; +use crate::sinks::util::service::TowerRequestConfigDefaults; use crate::{ codecs::{self, EncodingConfig}, config::{GenerateConfig, SinkConfig, SinkContext}, @@ -97,6 +98,12 @@ impl SinkBatchSettings for ChronicleUnstructuredDefaultBatchSettings { const TIMEOUT_SECS: f64 = 15.0; } +#[derive(Clone, Copy, Debug)] +pub struct ChronicleUnstructuredTowerRequestConfigDefaults; + +impl TowerRequestConfigDefaults for ChronicleUnstructuredTowerRequestConfigDefaults { + const RATE_LIMIT_NUM: u64 = 1_000; +} /// Configuration for the `gcp_chronicle_unstructured` sink. #[configurable_component(sink( "gcp_chronicle_unstructured", @@ -132,7 +139,7 @@ pub struct ChronicleUnstructuredConfig { #[configurable(derived)] #[serde(default)] - pub request: TowerRequestConfig, + pub request: TowerRequestConfig, #[configurable(derived)] pub tls: Option, @@ -235,10 +242,7 @@ impl ChronicleUnstructuredConfig { ) -> crate::Result { use crate::sinks::util::service::ServiceBuilderExt; - let request = self.request.unwrap_with(&TowerRequestConfig { - rate_limit_num: Some(1000), - ..Default::default() - }); + let request = self.request.into_settings(); let batch_settings = self.batch.into_batcher_settings()?; diff --git a/src/sinks/gcp/cloud_storage.rs b/src/sinks/gcp/cloud_storage.rs index bb06e6a18ceba..e84519c57141f 100644 --- a/src/sinks/gcp/cloud_storage.rs +++ b/src/sinks/gcp/cloud_storage.rs @@ -15,6 +15,7 @@ use vector_lib::event::{EventFinalizers, Finalizable}; use vector_lib::{request_metadata::RequestMetadata, TimeZone}; use crate::sinks::util::metadata::RequestMetadataBuilder; +use crate::sinks::util::service::TowerRequestConfigDefaults; use crate::{ codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer}, config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, @@ -48,6 +49,13 @@ pub enum GcsHealthcheckError { KeyPrefixTemplate { source: TemplateParseError }, } +#[derive(Clone, Copy, Debug)] +pub struct GcsTowerRequestConfigDefaults; + +impl TowerRequestConfigDefaults for GcsTowerRequestConfigDefaults { + const RATE_LIMIT_NUM: u64 = 1_000; +} + /// Configuration for the `gcp_cloud_storage` sink. #[configurable_component(sink( "gcp_cloud_storage", @@ -149,7 +157,7 @@ pub struct GcsSinkConfig { #[configurable(derived)] #[serde(default)] - request: TowerRequestConfig, + request: TowerRequestConfig, #[serde(flatten)] auth: GcpAuthConfig, @@ -245,10 +253,7 @@ impl GcsSinkConfig { auth: GcpAuthenticator, cx: SinkContext, ) -> crate::Result { - let request = self.request.unwrap_with(&TowerRequestConfig { - rate_limit_num: Some(1000), - ..Default::default() - }); + let request = self.request.into_settings(); let batch_settings = self.batch.into_batcher_settings()?; diff --git a/src/sinks/gcp/pubsub.rs b/src/sinks/gcp/pubsub.rs index 658b62061de63..876fe9ba8e5ac 100644 --- a/src/sinks/gcp/pubsub.rs +++ b/src/sinks/gcp/pubsub.rs @@ -123,7 +123,7 @@ impl SinkConfig for PubsubConfig { .validate()? .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)? .into_batch_settings()?; - let request_settings = self.request.unwrap_with(&Default::default()); + let request_settings = self.request.into_settings(); let tls_settings = TlsSettings::from_options(&self.tls)?; let client = HttpClient::new(tls_settings, cx.proxy())?; diff --git a/src/sinks/gcp/stackdriver/logs/config.rs b/src/sinks/gcp/stackdriver/logs/config.rs index 9e3a82eb4c3b5..164570ecb0427 100644 --- a/src/sinks/gcp/stackdriver/logs/config.rs +++ b/src/sinks/gcp/stackdriver/logs/config.rs @@ -9,6 +9,7 @@ use crate::{ prelude::*, util::{ http::{http_response_retry_logic, HttpService}, + service::TowerRequestConfigDefaults, BoxedRawValue, RealtimeSizeBasedDefaultBatchSettings, }, }, @@ -31,6 +32,13 @@ enum HealthcheckError { NotFound, } +#[derive(Clone, Copy, Debug)] +pub struct StackdriverTowerRequestConfigDefaults; + +impl TowerRequestConfigDefaults for StackdriverTowerRequestConfigDefaults { + const RATE_LIMIT_NUM: u64 = 1_000; +} + /// Configuration for the `gcp_stackdriver_logs` sink. #[configurable_component(sink( "gcp_stackdriver_logs", @@ -85,7 +93,7 @@ pub(super) struct StackdriverConfig { #[configurable(derived)] #[serde(default)] - pub(super) request: TowerRequestConfig, + pub(super) request: TowerRequestConfig, #[configurable(derived)] pub(super) tls: Option, @@ -214,11 +222,7 @@ impl SinkConfig for StackdriverConfig { .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)? .into_batcher_settings()?; - let request_limits = self.request.unwrap_with( - &TowerRequestConfig::default() - .rate_limit_duration_secs(1) - .rate_limit_num(1000), - ); + let request_limits = self.request.into_settings(); let tls_settings = TlsSettings::from_options(&self.tls)?; let client = HttpClient::new(tls_settings, cx.proxy())?; diff --git a/src/sinks/gcp/stackdriver/metrics/config.rs b/src/sinks/gcp/stackdriver/metrics/config.rs index 99d03d114fd29..fa0f86d8ce9f6 100644 --- a/src/sinks/gcp/stackdriver/metrics/config.rs +++ b/src/sinks/gcp/stackdriver/metrics/config.rs @@ -8,7 +8,10 @@ use crate::{ sinks::{ gcp, prelude::*, - util::http::{http_response_retry_logic, HttpService, HttpServiceRequestBuilder}, + util::{ + http::{http_response_retry_logic, HttpService, HttpServiceRequestBuilder}, + service::TowerRequestConfigDefaults, + }, }, }; @@ -17,6 +20,13 @@ use super::{ sink::StackdriverMetricsSink, }; +#[derive(Clone, Copy, Debug)] +pub struct StackdriverMetricsTowerRequestConfigDefaults; + +impl TowerRequestConfigDefaults for StackdriverMetricsTowerRequestConfigDefaults { + const RATE_LIMIT_NUM: u64 = 1_000; +} + /// Configuration for the `gcp_stackdriver_metrics` sink. #[configurable_component(sink( "gcp_stackdriver_metrics", @@ -49,7 +59,7 @@ pub struct StackdriverConfig { #[configurable(derived)] #[serde(default)] - pub(super) request: TowerRequestConfig, + pub(super) request: TowerRequestConfig, #[configurable(derived)] #[serde(default)] @@ -98,11 +108,7 @@ impl SinkConfig for StackdriverConfig { }, }; - let request_limits = self.request.unwrap_with( - &TowerRequestConfig::default() - .rate_limit_duration_secs(1) - .rate_limit_num(1000), - ); + let request_limits = self.request.into_settings(); let uri: Uri = format!( "{}/v3/projects/{}/timeSeries", diff --git a/src/sinks/greptimedb/mod.rs b/src/sinks/greptimedb/mod.rs index ab1d132adc98b..4891ea32b6cc8 100644 --- a/src/sinks/greptimedb/mod.rs +++ b/src/sinks/greptimedb/mod.rs @@ -116,7 +116,7 @@ impl_generate_config_from_default!(GreptimeDBConfig); #[async_trait::async_trait] impl SinkConfig for GreptimeDBConfig { async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); + let request_settings = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_settings, GreptimeDBRetryLogic) .service(service::GreptimeDBService::try_new(self)?); diff --git a/src/sinks/honeycomb/config.rs b/src/sinks/honeycomb/config.rs index 6dda01074f7f0..50e656ae2b197 100644 --- a/src/sinks/honeycomb/config.rs +++ b/src/sinks/honeycomb/config.rs @@ -113,7 +113,7 @@ impl SinkConfig for HoneycombConfig { let service = HttpService::new(client.clone(), honeycomb_service_request_builder); - let request_limits = self.request.unwrap_with(&TowerRequestConfig::default()); + let request_limits = self.request.into_settings(); let service = ServiceBuilder::new() .settings(request_limits, http_response_retry_logic()) diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index 0bc67044012a4..a9b7b2aca5a07 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -286,7 +286,7 @@ impl SinkConfig for HttpSinkConfig { let service = HttpService::new(client, http_sink_request_builder); - let request_limits = self.request.tower.unwrap_with(&Default::default()); + let request_limits = self.request.tower.into_settings(); let service = ServiceBuilder::new() .settings(request_limits, http_response_retry_logic()) diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index bd5c8460407f3..3901ba1d97166 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -171,10 +171,7 @@ impl SinkConfig for InfluxDbLogsConfig { let healthcheck = self.healthcheck(client.clone())?; let batch = self.batch.into_batch_settings()?; - let request = self.request.unwrap_with(&TowerRequestConfig { - retry_attempts: Some(5), - ..Default::default() - }); + let request = self.request.into_settings(); let settings = influxdb_settings( self.influxdb1_settings.clone(), diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index 597c3e2190602..6b8232c70b5ef 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -159,10 +159,7 @@ impl InfluxDbSvc { let protocol_version = settings.protocol_version(); let batch = config.batch.into_batch_settings()?; - let request = config.request.unwrap_with(&TowerRequestConfig { - retry_attempts: Some(5), - ..Default::default() - }); + let request = config.request.into_settings(); let uri = settings.write_uri(endpoint)?; diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index cc3127415bf62..f244c1b5f025a 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -389,9 +389,9 @@ impl LokiSink { // requires in-order processing for version >= 2.4, instead we just keep the static limit // of 1 for now. let request_limits = match config.out_of_order_action { - OutOfOrderAction::Accept => config.request.unwrap_with(&Default::default()), + OutOfOrderAction::Accept => config.request.into_settings(), OutOfOrderAction::Drop | OutOfOrderAction::RewriteTimestamp => { - let mut settings = config.request.unwrap_with(&Default::default()); + let mut settings = config.request.into_settings(); settings.concurrency = Some(1); settings } diff --git a/src/sinks/mezmo.rs b/src/sinks/mezmo.rs index 149b2c024c812..b28e73f39c1b7 100644 --- a/src/sinks/mezmo.rs +++ b/src/sinks/mezmo.rs @@ -163,7 +163,7 @@ impl SinkConfig for MezmoConfig { &self, cx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); + let request_settings = self.request.into_settings(); let batch_settings = self.batch.into_batch_settings()?; let client = HttpClient::new(None, cx.proxy())?; diff --git a/src/sinks/nats/config.rs b/src/sinks/nats/config.rs index db78aff4a9bdd..608e4895d12fa 100644 --- a/src/sinks/nats/config.rs +++ b/src/sinks/nats/config.rs @@ -5,11 +5,18 @@ use vector_lib::tls::TlsEnableableConfig; use crate::{ nats::{from_tls_auth_config, NatsAuthConfig, NatsConfigError}, - sinks::prelude::*, + sinks::{prelude::*, util::service::TowerRequestConfigDefaults}, }; use super::{sink::NatsSink, ConfigSnafu, ConnectSnafu, NatsError}; +#[derive(Clone, Copy, Debug)] +pub struct NatsTowerRequestConfigDefaults; + +impl TowerRequestConfigDefaults for NatsTowerRequestConfigDefaults { + const CONCURRENCY: Concurrency = Concurrency::None; +} + /// Configuration for the `nats` sink. #[configurable_component(sink( "nats", @@ -68,7 +75,7 @@ pub struct NatsSinkConfig { #[configurable(derived)] #[serde(default)] - pub(super) request: TowerRequestConfig, + pub(super) request: TowerRequestConfig, } fn default_name() -> String { diff --git a/src/sinks/nats/sink.rs b/src/sinks/nats/sink.rs index f6bea42919e9d..f2f4524b6ecfb 100644 --- a/src/sinks/nats/sink.rs +++ b/src/sinks/nats/sink.rs @@ -5,7 +5,7 @@ use snafu::ResultExt; use crate::sinks::prelude::*; use super::{ - config::NatsSinkConfig, + config::{NatsSinkConfig, NatsTowerRequestConfigDefaults}, request_builder::{NatsEncoder, NatsRequestBuilder}, service::{NatsResponse, NatsService}, EncodingSnafu, NatsError, @@ -17,7 +17,7 @@ pub(super) struct NatsEvent { } pub(super) struct NatsSink { - request: TowerRequestConfig, + request: TowerRequestConfig, transformer: Transformer, encoder: Encoder<()>, connection: Arc, @@ -59,10 +59,7 @@ impl NatsSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let request = self.request.unwrap_with(&TowerRequestConfig { - concurrency: Concurrency::Fixed(1), - ..Default::default() - }); + let request = self.request.into_settings(); let request_builder = NatsRequestBuilder { encoder: NatsEncoder { diff --git a/src/sinks/new_relic/config.rs b/src/sinks/new_relic/config.rs index 6ac338929a0aa..f13015add1da8 100644 --- a/src/sinks/new_relic/config.rs +++ b/src/sinks/new_relic/config.rs @@ -141,7 +141,7 @@ impl SinkConfig for NewRelicConfig { .limit_max_events(self.batch.max_events.unwrap_or(100))? .into_batcher_settings()?; - let request_limits = self.request.unwrap_with(&Default::default()); + let request_limits = self.request.into_settings(); let tls_settings = TlsSettings::from_options(&None)?; let client = HttpClient::new(tls_settings, &cx.proxy)?; let credentials = Arc::from(NewRelicCredentials::from(self)); diff --git a/src/sinks/prometheus/remote_write/config.rs b/src/sinks/prometheus/remote_write/config.rs index 8f9f065642bdf..954103b6a6631 100644 --- a/src/sinks/prometheus/remote_write/config.rs +++ b/src/sinks/prometheus/remote_write/config.rs @@ -135,7 +135,7 @@ impl SinkConfig for RemoteWriteConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let endpoint = self.endpoint.parse::().context(UriParseSnafu)?; let tls_settings = TlsSettings::from_options(&self.tls)?; - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); + let request_settings = self.request.into_settings(); let buckets = self.buckets.clone(); let quantiles = self.quantiles.clone(); let default_namespace = self.default_namespace.clone(); diff --git a/src/sinks/redis/config.rs b/src/sinks/redis/config.rs index 0504cda3ceb30..b478fc0caba5a 100644 --- a/src/sinks/redis/config.rs +++ b/src/sinks/redis/config.rs @@ -1,10 +1,17 @@ use redis::{aio::ConnectionManager, RedisResult}; use snafu::prelude::*; -use crate::sinks::prelude::*; +use crate::sinks::{prelude::*, util::service::TowerRequestConfigDefaults}; use super::{sink::RedisSink, RedisCreateFailedSnafu}; +#[derive(Clone, Copy, Debug)] +pub struct RedisTowerRequestConfigDefaults; + +impl TowerRequestConfigDefaults for RedisTowerRequestConfigDefaults { + const CONCURRENCY: Concurrency = Concurrency::None; +} + /// Redis data type to store messages in. #[configurable_component] #[derive(Clone, Copy, Debug, Derivative)] @@ -98,7 +105,7 @@ pub struct RedisSinkConfig { #[configurable(derived)] #[serde(default)] - pub(super) request: TowerRequestConfig, + pub(super) request: TowerRequestConfig, #[configurable(derived)] #[serde( diff --git a/src/sinks/redis/integration_tests.rs b/src/sinks/redis/integration_tests.rs index 1461b3896a25f..4cc1856cb62d3 100644 --- a/src/sinks/redis/integration_tests.rs +++ b/src/sinks/redis/integration_tests.rs @@ -44,7 +44,7 @@ async fn redis_sink_list_lpush() { }), batch: BatchConfig::default(), request: TowerRequestConfig { - rate_limit_num: Some(u64::MAX), + rate_limit_num: u64::MAX, ..Default::default() }, acknowledgements: Default::default(), @@ -108,7 +108,7 @@ async fn redis_sink_list_rpush() { }), batch: BatchConfig::default(), request: TowerRequestConfig { - rate_limit_num: Some(u64::MAX), + rate_limit_num: u64::MAX, ..Default::default() }, acknowledgements: Default::default(), @@ -186,7 +186,7 @@ async fn redis_sink_channel() { list_option: None, batch: BatchConfig::default(), request: TowerRequestConfig { - rate_limit_num: Some(u64::MAX), + rate_limit_num: u64::MAX, ..Default::default() }, acknowledgements: Default::default(), @@ -262,7 +262,7 @@ async fn redis_sink_channel_data_volume_tags() { list_option: None, batch: BatchConfig::default(), request: TowerRequestConfig { - rate_limit_num: Some(u64::MAX), + rate_limit_num: u64::MAX, ..Default::default() }, acknowledgements: Default::default(), diff --git a/src/sinks/redis/sink.rs b/src/sinks/redis/sink.rs index 128ce05adddef..5cac95f7698d7 100644 --- a/src/sinks/redis/sink.rs +++ b/src/sinks/redis/sink.rs @@ -2,20 +2,17 @@ use std::future; use redis::{aio::ConnectionManager, RedisError}; -use crate::sinks::{ - prelude::*, - util::{retries::RetryAction, Concurrency}, -}; +use crate::sinks::{prelude::*, util::retries::RetryAction}; use super::{ - config::{DataTypeConfig, RedisSinkConfig}, + config::{DataTypeConfig, RedisSinkConfig, RedisTowerRequestConfigDefaults}, request_builder::request_builder, service::{RedisResponse, RedisService}, RedisEvent, }; pub(super) struct RedisSink { - request: TowerRequestConfig, + request: TowerRequestConfig, encoder: crate::codecs::Encoder<()>, transformer: crate::codecs::Transformer, conn: ConnectionManager, @@ -70,10 +67,7 @@ impl RedisSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let request = self.request.unwrap_with(&TowerRequestConfig { - concurrency: Concurrency::Fixed(1), - ..Default::default() - }); + let request = self.request.into_settings(); let service = RedisService { conn: self.conn.clone(), diff --git a/src/sinks/sematext/metrics.rs b/src/sinks/sematext/metrics.rs index 14a3f37470709..33138fdfd99b6 100644 --- a/src/sinks/sematext/metrics.rs +++ b/src/sinks/sematext/metrics.rs @@ -166,10 +166,7 @@ impl SematextMetricsService { client: HttpClient, ) -> Result { let batch = config.batch.into_batch_settings()?; - let request = config.request.unwrap_with(&TowerRequestConfig { - retry_attempts: Some(5), - ..Default::default() - }); + let request = config.request.into_settings(); let http_service = HttpBatchService::new(client, create_build_request(endpoint)); let sematext_service = SematextMetricsService { config, diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index 2bd2515f237c0..4fa130187f7c6 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -239,7 +239,7 @@ impl HecLogsSinkConfig { compression: self.compression, }; - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); + let request_settings = self.request.into_settings(); let http_request_builder = Arc::new(HttpRequestBuilder::new( self.endpoint.clone(), self.endpoint_target, diff --git a/src/sinks/splunk_hec/metrics/config.rs b/src/sinks/splunk_hec/metrics/config.rs index cc64656c68aff..a100830585a21 100644 --- a/src/sinks/splunk_hec/metrics/config.rs +++ b/src/sinks/splunk_hec/metrics/config.rs @@ -181,7 +181,7 @@ impl HecMetricsSinkConfig { compression: self.compression, }; - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); + let request_settings = self.request.into_settings(); let http_request_builder = Arc::new(HttpRequestBuilder::new( self.endpoint.clone(), EndpointTarget::default(), diff --git a/src/sinks/util/adaptive_concurrency/controller.rs b/src/sinks/util/adaptive_concurrency/controller.rs index 8f14408e2bac6..7bc6cec55cc30 100644 --- a/src/sinks/util/adaptive_concurrency/controller.rs +++ b/src/sinks/util/adaptive_concurrency/controller.rs @@ -68,7 +68,7 @@ impl Controller { // If a `concurrency` is specified, it becomes both the // current limit and the maximum, effectively bypassing all the // mechanisms. Otherwise, the current limit is set to 1 and the - // maximum to MAX_CONCURRENCY. + // maximum to `settings.max_concurrency_limit`. let current_limit = concurrency.unwrap_or(settings.initial_concurrency); Self { semaphore: Arc::new(ShrinkableSemaphore::new(current_limit)), @@ -226,7 +226,7 @@ impl Controller { // concurrency limit. Note that we only check this if we had // requests to go beyond the current limit to prevent // increasing the limit beyond what we have evidence for. - if inner.current_limit < super::MAX_CONCURRENCY + if inner.current_limit < self.settings.max_concurrency_limit && inner.reached_limit && !inner.had_back_pressure && current_rtt.is_some() diff --git a/src/sinks/util/adaptive_concurrency/mod.rs b/src/sinks/util/adaptive_concurrency/mod.rs index dedbc8d1221b1..6b1e21439ffaa 100644 --- a/src/sinks/util/adaptive_concurrency/mod.rs +++ b/src/sinks/util/adaptive_concurrency/mod.rs @@ -9,10 +9,6 @@ mod service; #[cfg(test)] pub mod tests; -// Make sure to update the max range of the `AdaptiveConcurrencySettings::initial_concurrency` when changing -// this constant. -pub(super) const MAX_CONCURRENCY: usize = 200; - pub(crate) use layer::AdaptiveConcurrencyLimitLayer; pub(crate) use service::AdaptiveConcurrencyLimit; use vector_lib::configurable::configurable_component; @@ -36,7 +32,7 @@ pub struct AdaptiveConcurrencySettings { /// It is recommended to set this value to your service's average limit if you're seeing that it takes a /// long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the /// `adaptive_concurrency_limit` metric. - #[configurable(validation(range(min = 1, max = 200)))] + #[configurable(validation(range(min = 1)))] #[serde(default = "default_initial_concurrency")] pub(super) initial_concurrency: usize, @@ -72,6 +68,13 @@ pub struct AdaptiveConcurrencySettings { #[configurable(validation(range(min = 0.0)))] #[serde(default = "default_rtt_deviation_scale")] pub(super) rtt_deviation_scale: f64, + + /// The maximum concurrency limit. + /// + /// The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + #[configurable(validation(range(min = 1)))] + #[serde(default = "default_max_concurrency_limit")] + pub(super) max_concurrency_limit: usize, } const fn default_initial_concurrency() -> usize { @@ -90,10 +93,8 @@ const fn default_rtt_deviation_scale() -> f64 { 2.5 } -impl AdaptiveConcurrencySettings { - pub const fn max_concurrency() -> usize { - MAX_CONCURRENCY - } +const fn default_max_concurrency_limit() -> usize { + 200 } impl Default for AdaptiveConcurrencySettings { @@ -103,6 +104,7 @@ impl Default for AdaptiveConcurrencySettings { decrease_ratio: default_decrease_ratio(), ewma_alpha: default_ewma_alpha(), rtt_deviation_scale: default_rtt_deviation_scale(), + max_concurrency_limit: default_max_concurrency_limit(), } } } diff --git a/src/sinks/util/adaptive_concurrency/tests.rs b/src/sinks/util/adaptive_concurrency/tests.rs index 9a13f3c8f4831..12d69fa103f64 100644 --- a/src/sinks/util/adaptive_concurrency/tests.rs +++ b/src/sinks/util/adaptive_concurrency/tests.rs @@ -178,7 +178,7 @@ impl SinkConfig for TestConfig { batch_settings.size.events = 1; batch_settings.timeout = Duration::from_secs(9999); - let request = self.request.unwrap_with(&TowerRequestConfig::default()); + let request = self.request.into_settings(); let sink = request .batch_sink( TestRetryLogic, @@ -415,8 +415,8 @@ async fn run_test(params: TestParams) -> TestResults { let test_config = TestConfig { request: TowerRequestConfig { concurrency: params.concurrency, - rate_limit_num: Some(9999), - timeout_secs: Some(1), + rate_limit_num: 9999, + timeout_secs: 1, retry_jitter_mode: JitterMode::None, ..Default::default() }, diff --git a/src/sinks/util/service.rs b/src/sinks/util/service.rs index a9ee710302391..17e9fc303bd9a 100644 --- a/src/sinks/util/service.rs +++ b/src/sinks/util/service.rs @@ -15,7 +15,7 @@ use tower::{ use vector_lib::configurable::configurable_component; pub use crate::sinks::util::service::{ - concurrency::{concurrency_is_adaptive, Concurrency}, + concurrency::Concurrency, health::{HealthConfig, HealthLogic, HealthService}, map::Map, }; @@ -84,6 +84,21 @@ impl ServiceBuilderExt for ServiceBuilder { } } +pub trait TowerRequestConfigDefaults { + const CONCURRENCY: Concurrency = Concurrency::Adaptive; + const TIMEOUT_SECS: u64 = 60; + const RATE_LIMIT_DURATION_SECS: u64 = 1; + const RATE_LIMIT_NUM: u64 = i64::max_value() as u64; // i64 avoids TOML deserialize issue + const RETRY_ATTEMPTS: usize = isize::max_value() as usize; // isize avoids TOML deserialize issue + const RETRY_MAX_DURATION_SECS: u64 = 30; + const RETRY_INITIAL_BACKOFF_SECS: u64 = 1; +} + +#[derive(Clone, Copy, Debug)] +pub struct GlobalTowerRequestConfigDefaults; + +impl TowerRequestConfigDefaults for GlobalTowerRequestConfigDefaults {} + /// Middleware settings for outbound requests. /// /// Various settings can be configured, such as concurrency and rate limits, timeouts, retry behavior, etc. @@ -93,10 +108,10 @@ impl ServiceBuilderExt for ServiceBuilder { #[configurable_component] #[configurable(metadata(docs::advanced))] #[derive(Clone, Copy, Debug)] -pub struct TowerRequestConfig { +pub struct TowerRequestConfig { #[configurable(derived)] - #[serde(default = "default_concurrency")] - #[serde(skip_serializing_if = "concurrency_is_adaptive")] + #[serde(default = "default_concurrency::")] + #[serde(skip_serializing_if = "concurrency_is_default::")] pub concurrency: Concurrency, /// The time a request can take before being aborted. @@ -104,42 +119,40 @@ pub struct TowerRequestConfig { /// Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could /// create orphaned requests, pile on retries, and result in duplicate data downstream. #[configurable(metadata(docs::type_unit = "seconds"))] - #[serde(default = "default_timeout_secs")] #[configurable(metadata(docs::human_name = "Timeout"))] - pub timeout_secs: Option, + #[serde(default = "default_timeout_secs::")] + pub timeout_secs: u64, /// The time window used for the `rate_limit_num` option. #[configurable(metadata(docs::type_unit = "seconds"))] - #[serde(default = "default_rate_limit_duration_secs")] #[configurable(metadata(docs::human_name = "Rate Limit Duration"))] - pub rate_limit_duration_secs: Option, + #[serde(default = "default_rate_limit_duration_secs::")] + pub rate_limit_duration_secs: u64, /// The maximum number of requests allowed within the `rate_limit_duration_secs` time window. #[configurable(metadata(docs::type_unit = "requests"))] - #[serde(default = "default_rate_limit_num")] #[configurable(metadata(docs::human_name = "Rate Limit Number"))] - pub rate_limit_num: Option, + #[serde(default = "default_rate_limit_num::")] + pub rate_limit_num: u64, /// The maximum number of retries to make for failed requests. - /// - /// The default, for all intents and purposes, represents an infinite number of retries. #[configurable(metadata(docs::type_unit = "retries"))] - #[serde(default = "default_retry_attempts")] - pub retry_attempts: Option, + #[serde(default = "default_retry_attempts::")] + pub retry_attempts: usize, /// The maximum amount of time to wait between retries. #[configurable(metadata(docs::type_unit = "seconds"))] - #[serde(default = "default_retry_max_duration_secs")] #[configurable(metadata(docs::human_name = "Max Retry Duration"))] - pub retry_max_duration_secs: Option, + #[serde(default = "default_retry_max_duration_secs::")] + pub retry_max_duration_secs: u64, /// The amount of time to wait before attempting the first retry for a failed request. /// /// After the first retry has failed, the fibonacci sequence is used to select future backoffs. #[configurable(metadata(docs::type_unit = "seconds"))] - #[serde(default = "default_retry_initial_backoff_secs")] #[configurable(metadata(docs::human_name = "Retry Initial Backoff"))] - pub retry_initial_backoff_secs: Option, + #[serde(default = "default_retry_initial_backoff_secs::")] + pub retry_initial_backoff_secs: u64, #[configurable(derived)] #[serde(default)] @@ -148,130 +161,72 @@ pub struct TowerRequestConfig { #[configurable(derived)] #[serde(default)] pub adaptive_concurrency: AdaptiveConcurrencySettings, + + #[serde(skip)] + pub _d: PhantomData, +} + +const fn default_concurrency() -> Concurrency { + D::CONCURRENCY } -const fn default_concurrency() -> Concurrency { - Concurrency::Adaptive +fn concurrency_is_default(concurrency: &Concurrency) -> bool { + *concurrency == D::CONCURRENCY } -const fn default_timeout_secs() -> Option { - Some(60) +const fn default_timeout_secs() -> u64 { + D::TIMEOUT_SECS } -const fn default_rate_limit_duration_secs() -> Option { - Some(1) +const fn default_rate_limit_duration_secs() -> u64 { + D::RATE_LIMIT_DURATION_SECS } -const fn default_rate_limit_num() -> Option { - // i64 avoids TOML deserialize issue - Some(i64::max_value() as u64) +const fn default_rate_limit_num() -> u64 { + D::RATE_LIMIT_NUM } -const fn default_retry_attempts() -> Option { - // i64 avoids TOML deserialize issue - Some(isize::max_value() as usize) +const fn default_retry_attempts() -> usize { + D::RETRY_ATTEMPTS } -const fn default_retry_max_duration_secs() -> Option { - Some(3_600) +const fn default_retry_max_duration_secs() -> u64 { + D::RETRY_MAX_DURATION_SECS } -const fn default_retry_initial_backoff_secs() -> Option { - Some(1) +const fn default_retry_initial_backoff_secs() -> u64 { + D::RETRY_INITIAL_BACKOFF_SECS } -impl Default for TowerRequestConfig { +impl Default for TowerRequestConfig { fn default() -> Self { Self { - concurrency: default_concurrency(), - timeout_secs: default_timeout_secs(), - rate_limit_duration_secs: default_rate_limit_duration_secs(), - rate_limit_num: default_rate_limit_num(), - retry_attempts: default_retry_attempts(), - retry_max_duration_secs: default_retry_max_duration_secs(), - retry_initial_backoff_secs: default_retry_initial_backoff_secs(), + concurrency: default_concurrency::(), + timeout_secs: default_timeout_secs::(), + rate_limit_duration_secs: default_rate_limit_duration_secs::(), + rate_limit_num: default_rate_limit_num::(), + retry_attempts: default_retry_attempts::(), + retry_max_duration_secs: default_retry_max_duration_secs::(), + retry_initial_backoff_secs: default_retry_initial_backoff_secs::(), adaptive_concurrency: AdaptiveConcurrencySettings::default(), retry_jitter_mode: JitterMode::default(), - } - } -} -impl TowerRequestConfig { - pub fn new(concurrency: Concurrency) -> Self { - Self { - concurrency, - ..Default::default() + _d: PhantomData, } } +} - pub const fn timeout_secs(mut self, timeout_secs: u64) -> Self { - self.timeout_secs = Some(timeout_secs); - self - } - - pub const fn rate_limit_duration_secs(mut self, rate_limit_duration_secs: u64) -> Self { - self.rate_limit_duration_secs = Some(rate_limit_duration_secs); - self - } - - pub const fn rate_limit_num(mut self, rate_limit_num: u64) -> Self { - self.rate_limit_num = Some(rate_limit_num); - self - } - - pub const fn retry_attempts(mut self, retry_attempts: usize) -> Self { - self.retry_attempts = Some(retry_attempts); - self - } - - pub const fn retry_max_duration_secs(mut self, retry_max_duration_secs: u64) -> Self { - self.retry_max_duration_secs = Some(retry_max_duration_secs); - self - } - - pub const fn retry_initial_backoff_secs(mut self, retry_initial_backoff_secs: u64) -> Self { - self.retry_initial_backoff_secs = Some(retry_initial_backoff_secs); - self - } - - pub fn unwrap_with(&self, defaults: &Self) -> TowerRequestSettings { +impl TowerRequestConfig { + pub const fn into_settings(&self) -> TowerRequestSettings { // the unwrap() calls below are safe because the final defaults are always Some<> TowerRequestSettings { - concurrency: self.concurrency.parse_concurrency(defaults.concurrency), - timeout: Duration::from_secs( - self.timeout_secs - .or(defaults.timeout_secs) - .or(default_timeout_secs()) - .unwrap(), - ), - rate_limit_duration: Duration::from_secs( - self.rate_limit_duration_secs - .or(defaults.rate_limit_duration_secs) - .or(default_rate_limit_duration_secs()) - .unwrap(), - ), - rate_limit_num: self - .rate_limit_num - .or(defaults.rate_limit_num) - .or(default_rate_limit_num()) - .unwrap(), - retry_attempts: self - .retry_attempts - .or(defaults.retry_attempts) - .or(default_retry_attempts()) - .unwrap(), - retry_max_duration_secs: Duration::from_secs( - self.retry_max_duration_secs - .or(defaults.retry_max_duration_secs) - .or(default_retry_max_duration_secs()) - .unwrap(), - ), - retry_initial_backoff_secs: Duration::from_secs( - self.retry_initial_backoff_secs - .or(defaults.retry_initial_backoff_secs) - .or(default_retry_initial_backoff_secs()) - .unwrap(), - ), + concurrency: self.concurrency.parse_concurrency(), + timeout: Duration::from_secs(self.timeout_secs), + rate_limit_duration: Duration::from_secs(self.rate_limit_duration_secs), + rate_limit_num: self.rate_limit_num, + retry_attempts: self.retry_attempts, + retry_max_duration: Duration::from_secs(self.retry_max_duration_secs), + retry_initial_backoff: Duration::from_secs(self.retry_initial_backoff_secs), adaptive_concurrency: self.adaptive_concurrency, retry_jitter_mode: self.retry_jitter_mode, } @@ -285,8 +240,8 @@ pub struct TowerRequestSettings { pub rate_limit_duration: Duration, pub rate_limit_num: u64, pub retry_attempts: usize, - pub retry_max_duration_secs: Duration, - pub retry_initial_backoff_secs: Duration, + pub retry_max_duration: Duration, + pub retry_initial_backoff: Duration, pub adaptive_concurrency: AdaptiveConcurrencySettings, pub retry_jitter_mode: JitterMode, } @@ -295,8 +250,8 @@ impl TowerRequestSettings { pub fn retry_policy(&self, logic: L) -> FibonacciRetryPolicy { FibonacciRetryPolicy::new( self.retry_attempts, - self.retry_initial_backoff_secs, - self.retry_max_duration_secs, + self.retry_initial_backoff, + self.retry_max_duration, logic, self.retry_jitter_mode, ) @@ -467,7 +422,7 @@ mod tests { #[test] fn concurrency_param_works() { - let cfg = TowerRequestConfig::default(); + let cfg = TowerRequestConfig::::default(); let toml = toml::to_string(&cfg).unwrap(); toml::from_str::(&toml).expect("Default config failed"); @@ -482,6 +437,10 @@ mod tests { .expect("Adaptive concurrency setting failed"); assert_eq!(cfg.concurrency, Concurrency::Adaptive); + let cfg = toml::from_str::(r#"concurrency = "none""#) + .expect("None concurrency setting failed"); + assert_eq!(cfg.concurrency, Concurrency::None); + toml::from_str::(r#"concurrency = "broken""#) .expect_err("Invalid concurrency setting didn't fail"); @@ -493,19 +452,82 @@ mod tests { } #[test] - fn config_merging_defaults_concurrency_to_none_if_unset() { - let cfg = TowerRequestConfig::default().unwrap_with(&TowerRequestConfig::default()); + fn into_settings_with_global_defaults() { + let cfg = TowerRequestConfig::::default(); + let settings = cfg.into_settings(); + + assert_eq!(settings.concurrency, None); + assert_eq!(settings.timeout, Duration::from_secs(60)); + assert_eq!(settings.rate_limit_duration, Duration::from_secs(1)); + assert_eq!(settings.rate_limit_num, i64::max_value() as u64); + assert_eq!(settings.retry_attempts, isize::max_value() as usize); + assert_eq!(settings.retry_max_duration, Duration::from_secs(30)); + assert_eq!(settings.retry_initial_backoff, Duration::from_secs(1)); + } + + #[derive(Clone, Copy, Debug)] + pub struct TestTowerRequestConfigDefaults; + + impl TowerRequestConfigDefaults for TestTowerRequestConfigDefaults { + const CONCURRENCY: Concurrency = Concurrency::None; + const TIMEOUT_SECS: u64 = 1; + const RATE_LIMIT_DURATION_SECS: u64 = 2; + const RATE_LIMIT_NUM: u64 = 3; + const RETRY_ATTEMPTS: usize = 4; + const RETRY_MAX_DURATION_SECS: u64 = 5; + const RETRY_INITIAL_BACKOFF_SECS: u64 = 6; + } + + #[test] + fn into_settings_with_overridden_defaults() { + let cfg = TowerRequestConfig::::default(); + let settings = cfg.into_settings(); + + assert_eq!(settings.concurrency, Some(1)); + assert_eq!(settings.timeout, Duration::from_secs(1)); + assert_eq!(settings.rate_limit_duration, Duration::from_secs(2)); + assert_eq!(settings.rate_limit_num, 3); + assert_eq!(settings.retry_attempts, 4); + assert_eq!(settings.retry_max_duration, Duration::from_secs(5)); + assert_eq!(settings.retry_initial_backoff, Duration::from_secs(6)); + } + + #[test] + fn into_settings_with_populated_config() { + // Populate with values not equal to the global defaults. + let cfg = toml::from_str::( + r#" concurrency = 16 + timeout_secs = 1 + rate_limit_duration_secs = 2 + rate_limit_num = 3 + retry_attempts = 4 + retry_max_duration_secs = 5 + retry_initial_backoff_secs = 6 + "#, + ) + .expect("Config failed to parse"); - assert_eq!(cfg.concurrency, None); + // Merge with defaults + let settings = cfg.into_settings(); + assert_eq!( + settings.concurrency, + Concurrency::Fixed(16).parse_concurrency() + ); + assert_eq!(settings.timeout, Duration::from_secs(1)); + assert_eq!(settings.rate_limit_duration, Duration::from_secs(2)); + assert_eq!(settings.rate_limit_num, 3); + assert_eq!(settings.retry_attempts, 4); + assert_eq!(settings.retry_max_duration, Duration::from_secs(5)); + assert_eq!(settings.retry_initial_backoff, Duration::from_secs(6)); } #[tokio::test] async fn partition_sink_retry_concurrency() { - let cfg = TowerRequestConfig { + let cfg: TowerRequestConfig = TowerRequestConfig { concurrency: Concurrency::Fixed(1), ..TowerRequestConfig::default() }; - let settings = cfg.unwrap_with(&TowerRequestConfig::default()); + let settings = cfg.into_settings(); let sent_requests = Arc::new(Mutex::new(Vec::new())); diff --git a/src/sinks/util/service/concurrency.rs b/src/sinks/util/service/concurrency.rs index cc77ef0338fc0..6bb9586ccc7f6 100644 --- a/src/sinks/util/service/concurrency.rs +++ b/src/sinks/util/service/concurrency.rs @@ -56,26 +56,15 @@ impl Default for Concurrency { } impl Concurrency { - const fn if_adaptive(self, other: Self) -> Self { + pub const fn parse_concurrency(&self) -> Option { match self { - Self::Adaptive => other, - _ => self, - } - } - - pub const fn parse_concurrency(&self, other: Self) -> Option { - match self.if_adaptive(other) { Concurrency::None => Some(1), Concurrency::Adaptive => None, - Concurrency::Fixed(limit) => Some(limit), + Concurrency::Fixed(limit) => Some(*limit), } } } -pub const fn concurrency_is_adaptive(concurrency: &Concurrency) -> bool { - matches!(concurrency, Concurrency::Adaptive) -} - impl<'de> Deserialize<'de> for Concurrency { // Deserialize either a positive integer or the string "adaptive" fn deserialize(deserializer: D) -> Result diff --git a/src/sinks/vector/config.rs b/src/sinks/vector/config.rs index 095eda4a4503c..9f2664c3a11b3 100644 --- a/src/sinks/vector/config.rs +++ b/src/sinks/vector/config.rs @@ -123,7 +123,7 @@ impl SinkConfig for VectorConfig { let healthcheck_client = VectorService::new(client.clone(), healthcheck_uri, false); let healthcheck = healthcheck(healthcheck_client, cx.healthcheck); let service = VectorService::new(client, uri, self.compression); - let request_settings = self.request.unwrap_with(&TowerRequestConfig::default()); + let request_settings = self.request.into_settings(); let batch_settings = self.batch.into_batcher_settings()?; let service = ServiceBuilder::new() diff --git a/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md b/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md index 2418624c1e34c..9b1c171958221 100644 --- a/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md +++ b/website/content/en/highlights/2023-12-19-0-35-0-upgrade-guide.md @@ -21,6 +21,7 @@ and **potentially impactful changes**: 1. [Update `component_sent_bytes_total` to correctly report uncompressed bytes for all sinks](#component-sent-bytes-total) 1. [Update `component_received_bytes_total` to correctly report decompressed bytes for all sources](#component-received-bytes-total) +1. [Update default values for the `request.retry_max_duration_secs` and `request.rate_limit_num` sink configuration options](#request-config-options) We cover them below to help you upgrade quickly: @@ -58,3 +59,20 @@ report uncompressed bytes, rather than compressed bytes, for the `component_sent The Heroku Logs, HTTP Server, Prometheus Remote Write, and Splunk HEC sources now correctly report decompressed bytes, rather than compressed bytes, for the `component_received_bytes_total` internal metric. + +#### Update default values for the `request.retry_max_duration_secs` and `request.rate_limit_num` sink configuration options {#request-config-options} + +The `request.retry_max_duration_secs` config option previously defaulted to `3600` seconds. It now defaults to `30` seconds. + +Also, a bug was fixed that prevented component-level default values from being applied. In particular, this updates the default value +for `request.rate_limit_num` for the following sinks: + +| Sink | Default `request.rate_limit_num` Value | Previous Limit | +|--------------------------------|----------------------------------------|----------------| +| AWS Cloudwatch Metrics Sink | 150 | No limit | +| Azure Blob Storage Sink | 250 | No limit | +| GCP Chronicle Unstructured Sink| 1000 | No limit | +| GCP Cloud Storage Sink | 1000 | No limit | +| GCP Cloud Monitoring Sink | 1000 | No limit | + +Note that all changes described above are reflected in the component reference documentation. diff --git a/website/cue/reference/components/sinks/base/appsignal.cue b/website/cue/reference/components/sinks/base/appsignal.cue index 4101865b14d3f..0046a653a3188 100644 --- a/website/cue/reference/components/sinks/base/appsignal.cue +++ b/website/cue/reference/components/sinks/base/appsignal.cue @@ -192,6 +192,15 @@ base: components: sinks: appsignal: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -252,12 +261,8 @@ base: components: sinks: appsignal: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -299,7 +304,7 @@ base: components: sinks: appsignal: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue index f885c3daeefab..834e43e01175a 100644 --- a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue +++ b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue @@ -527,6 +527,15 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -602,12 +611,8 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -649,7 +654,7 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/aws_cloudwatch_metrics.cue b/website/cue/reference/components/sinks/base/aws_cloudwatch_metrics.cue index 3cb01f4f94252..b487d069d375a 100644 --- a/website/cue/reference/components/sinks/base/aws_cloudwatch_metrics.cue +++ b/website/cue/reference/components/sinks/base/aws_cloudwatch_metrics.cue @@ -276,6 +276,15 @@ base: components: sinks: aws_cloudwatch_metrics: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -331,17 +340,13 @@ base: components: sinks: aws_cloudwatch_metrics: configuration: { description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." required: false type: uint: { - default: 9223372036854775807 + default: 150 unit: "requests" } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -383,7 +388,7 @@ base: components: sinks: aws_cloudwatch_metrics: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue index 424f421b127c7..b03ddca27fbe3 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_firehose.cue @@ -500,6 +500,15 @@ base: components: sinks: aws_kinesis_firehose: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -560,12 +569,8 @@ base: components: sinks: aws_kinesis_firehose: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -607,7 +612,7 @@ base: components: sinks: aws_kinesis_firehose: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue index 193c26642c0fc..ba28c2931a099 100644 --- a/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue +++ b/website/cue/reference/components/sinks/base/aws_kinesis_streams.cue @@ -509,6 +509,15 @@ base: components: sinks: aws_kinesis_streams: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -569,12 +578,8 @@ base: components: sinks: aws_kinesis_streams: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -616,7 +621,7 @@ base: components: sinks: aws_kinesis_streams: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/aws_s3.cue b/website/cue/reference/components/sinks/base/aws_s3.cue index bb6e87754bf3d..762bb5339669a 100644 --- a/website/cue/reference/components/sinks/base/aws_s3.cue +++ b/website/cue/reference/components/sinks/base/aws_s3.cue @@ -746,6 +746,15 @@ base: components: sinks: aws_s3: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -806,12 +815,8 @@ base: components: sinks: aws_s3: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -853,7 +858,7 @@ base: components: sinks: aws_s3: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/aws_sns.cue b/website/cue/reference/components/sinks/base/aws_sns.cue index 3d64fa581e726..e582659904dd4 100644 --- a/website/cue/reference/components/sinks/base/aws_sns.cue +++ b/website/cue/reference/components/sinks/base/aws_sns.cue @@ -452,6 +452,15 @@ base: components: sinks: aws_sns: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -512,12 +521,8 @@ base: components: sinks: aws_sns: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -559,7 +564,7 @@ base: components: sinks: aws_sns: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/aws_sqs.cue b/website/cue/reference/components/sinks/base/aws_sqs.cue index 6e2bf39d8b01d..06eea12139980 100644 --- a/website/cue/reference/components/sinks/base/aws_sqs.cue +++ b/website/cue/reference/components/sinks/base/aws_sqs.cue @@ -457,6 +457,15 @@ base: components: sinks: aws_sqs: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -517,12 +526,8 @@ base: components: sinks: aws_sqs: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -564,7 +569,7 @@ base: components: sinks: aws_sqs: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/axiom.cue b/website/cue/reference/components/sinks/base/axiom.cue index d17bbfd74cb78..910ef98a29a39 100644 --- a/website/cue/reference/components/sinks/base/axiom.cue +++ b/website/cue/reference/components/sinks/base/axiom.cue @@ -124,6 +124,15 @@ base: components: sinks: axiom: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -199,12 +208,8 @@ base: components: sinks: axiom: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -246,7 +251,7 @@ base: components: sinks: axiom: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/azure_blob.cue b/website/cue/reference/components/sinks/base/azure_blob.cue index 6a0830ccf29fe..10777b6a1cc0e 100644 --- a/website/cue/reference/components/sinks/base/azure_blob.cue +++ b/website/cue/reference/components/sinks/base/azure_blob.cue @@ -494,6 +494,15 @@ base: components: sinks: azure_blob: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -549,17 +558,13 @@ base: components: sinks: azure_blob: configuration: { description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." required: false type: uint: { - default: 9223372036854775807 + default: 250 unit: "requests" } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -601,7 +606,7 @@ base: components: sinks: azure_blob: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/azure_monitor_logs.cue b/website/cue/reference/components/sinks/base/azure_monitor_logs.cue index 9ec8bae1899a2..66564517290cd 100644 --- a/website/cue/reference/components/sinks/base/azure_monitor_logs.cue +++ b/website/cue/reference/components/sinks/base/azure_monitor_logs.cue @@ -183,6 +183,15 @@ base: components: sinks: azure_monitor_logs: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -243,12 +252,8 @@ base: components: sinks: azure_monitor_logs: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -290,7 +295,7 @@ base: components: sinks: azure_monitor_logs: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/clickhouse.cue b/website/cue/reference/components/sinks/base/clickhouse.cue index ff8eef242d1d1..bc4617a4eeeb8 100644 --- a/website/cue/reference/components/sinks/base/clickhouse.cue +++ b/website/cue/reference/components/sinks/base/clickhouse.cue @@ -241,6 +241,15 @@ base: components: sinks: clickhouse: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -301,12 +310,8 @@ base: components: sinks: clickhouse: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -348,7 +353,7 @@ base: components: sinks: clickhouse: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/databend.cue b/website/cue/reference/components/sinks/base/databend.cue index 3c53c1d76dad2..8302802b59f18 100644 --- a/website/cue/reference/components/sinks/base/databend.cue +++ b/website/cue/reference/components/sinks/base/databend.cue @@ -339,6 +339,15 @@ base: components: sinks: databend: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -399,12 +408,8 @@ base: components: sinks: databend: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -446,7 +451,7 @@ base: components: sinks: databend: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/datadog_events.cue b/website/cue/reference/components/sinks/base/datadog_events.cue index c139cc9ae924e..2f109aee91a82 100644 --- a/website/cue/reference/components/sinks/base/datadog_events.cue +++ b/website/cue/reference/components/sinks/base/datadog_events.cue @@ -107,6 +107,15 @@ base: components: sinks: datadog_events: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -167,12 +176,8 @@ base: components: sinks: datadog_events: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -214,7 +219,7 @@ base: components: sinks: datadog_events: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/datadog_logs.cue b/website/cue/reference/components/sinks/base/datadog_logs.cue index 6bcbb91065ba1..0d00aa5b6bc04 100644 --- a/website/cue/reference/components/sinks/base/datadog_logs.cue +++ b/website/cue/reference/components/sinks/base/datadog_logs.cue @@ -195,6 +195,15 @@ base: components: sinks: datadog_logs: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -270,12 +279,8 @@ base: components: sinks: datadog_logs: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -317,7 +322,7 @@ base: components: sinks: datadog_logs: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/datadog_metrics.cue b/website/cue/reference/components/sinks/base/datadog_metrics.cue index a27fd554da6e3..60a3f098a76dd 100644 --- a/website/cue/reference/components/sinks/base/datadog_metrics.cue +++ b/website/cue/reference/components/sinks/base/datadog_metrics.cue @@ -149,6 +149,15 @@ base: components: sinks: datadog_metrics: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -209,12 +218,8 @@ base: components: sinks: datadog_metrics: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -256,7 +261,7 @@ base: components: sinks: datadog_metrics: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/datadog_traces.cue b/website/cue/reference/components/sinks/base/datadog_traces.cue index da6a8374162d5..a5b3bef5190d3 100644 --- a/website/cue/reference/components/sinks/base/datadog_traces.cue +++ b/website/cue/reference/components/sinks/base/datadog_traces.cue @@ -173,6 +173,15 @@ base: components: sinks: datadog_traces: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -233,12 +242,8 @@ base: components: sinks: datadog_traces: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -280,7 +285,7 @@ base: components: sinks: datadog_traces: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/elasticsearch.cue b/website/cue/reference/components/sinks/base/elasticsearch.cue index 0839e8e27126e..2ef047689c975 100644 --- a/website/cue/reference/components/sinks/base/elasticsearch.cue +++ b/website/cue/reference/components/sinks/base/elasticsearch.cue @@ -591,6 +591,15 @@ base: components: sinks: elasticsearch: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -666,12 +675,8 @@ base: components: sinks: elasticsearch: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -713,7 +718,7 @@ base: components: sinks: elasticsearch: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue b/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue index 2a6d9ba648a57..68c3754b2a5e1 100644 --- a/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue +++ b/website/cue/reference/components/sinks/base/gcp_chronicle_unstructured.cue @@ -413,6 +413,15 @@ base: components: sinks: gcp_chronicle_unstructured: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -468,17 +477,13 @@ base: components: sinks: gcp_chronicle_unstructured: configuration: { description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." required: false type: uint: { - default: 9223372036854775807 + default: 1000 unit: "requests" } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -520,7 +525,7 @@ base: components: sinks: gcp_chronicle_unstructured: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue index 231d6dba96eb2..102866a47f550 100644 --- a/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/base/gcp_cloud_storage.cue @@ -577,6 +577,15 @@ base: components: sinks: gcp_cloud_storage: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -632,17 +641,13 @@ base: components: sinks: gcp_cloud_storage: configuration: { description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." required: false type: uint: { - default: 9223372036854775807 + default: 1000 unit: "requests" } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -684,7 +689,7 @@ base: components: sinks: gcp_cloud_storage: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/gcp_pubsub.cue b/website/cue/reference/components/sinks/base/gcp_pubsub.cue index c46639fe97f55..0eb77ad77cea0 100644 --- a/website/cue/reference/components/sinks/base/gcp_pubsub.cue +++ b/website/cue/reference/components/sinks/base/gcp_pubsub.cue @@ -404,6 +404,15 @@ base: components: sinks: gcp_pubsub: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -464,12 +473,8 @@ base: components: sinks: gcp_pubsub: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -511,7 +516,7 @@ base: components: sinks: gcp_pubsub: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/gcp_stackdriver_logs.cue b/website/cue/reference/components/sinks/base/gcp_stackdriver_logs.cue index c173f99b4ca7a..ba6457243ab07 100644 --- a/website/cue/reference/components/sinks/base/gcp_stackdriver_logs.cue +++ b/website/cue/reference/components/sinks/base/gcp_stackdriver_logs.cue @@ -229,6 +229,15 @@ base: components: sinks: gcp_stackdriver_logs: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -284,17 +293,13 @@ base: components: sinks: gcp_stackdriver_logs: configuration: { description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." required: false type: uint: { - default: 9223372036854775807 + default: 1000 unit: "requests" } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -336,7 +341,7 @@ base: components: sinks: gcp_stackdriver_logs: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/gcp_stackdriver_metrics.cue b/website/cue/reference/components/sinks/base/gcp_stackdriver_metrics.cue index ab30447d4226e..4d37b29ad7514 100644 --- a/website/cue/reference/components/sinks/base/gcp_stackdriver_metrics.cue +++ b/website/cue/reference/components/sinks/base/gcp_stackdriver_metrics.cue @@ -167,6 +167,15 @@ base: components: sinks: gcp_stackdriver_metrics: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -222,17 +231,13 @@ base: components: sinks: gcp_stackdriver_metrics: configuration: { description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." required: false type: uint: { - default: 9223372036854775807 + default: 1000 unit: "requests" } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -274,7 +279,7 @@ base: components: sinks: gcp_stackdriver_metrics: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/greptimedb.cue b/website/cue/reference/components/sinks/base/greptimedb.cue index 101d10de74e96..9d4ce20f8a530 100644 --- a/website/cue/reference/components/sinks/base/greptimedb.cue +++ b/website/cue/reference/components/sinks/base/greptimedb.cue @@ -155,6 +155,15 @@ base: components: sinks: greptimedb: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -215,12 +224,8 @@ base: components: sinks: greptimedb: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -262,7 +267,7 @@ base: components: sinks: greptimedb: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/honeycomb.cue b/website/cue/reference/components/sinks/base/honeycomb.cue index 34a42cc46c904..29dbc538e8916 100644 --- a/website/cue/reference/components/sinks/base/honeycomb.cue +++ b/website/cue/reference/components/sinks/base/honeycomb.cue @@ -152,6 +152,15 @@ base: components: sinks: honeycomb: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -212,12 +221,8 @@ base: components: sinks: honeycomb: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -259,7 +264,7 @@ base: components: sinks: honeycomb: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/http.cue b/website/cue/reference/components/sinks/base/http.cue index f5606049f26f1..c272ab03bbd72 100644 --- a/website/cue/reference/components/sinks/base/http.cue +++ b/website/cue/reference/components/sinks/base/http.cue @@ -514,6 +514,15 @@ base: components: sinks: http: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -589,12 +598,8 @@ base: components: sinks: http: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -636,7 +641,7 @@ base: components: sinks: http: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/humio_logs.cue b/website/cue/reference/components/sinks/base/humio_logs.cue index 2150718e1676a..ef75fd53b3701 100644 --- a/website/cue/reference/components/sinks/base/humio_logs.cue +++ b/website/cue/reference/components/sinks/base/humio_logs.cue @@ -453,6 +453,15 @@ base: components: sinks: humio_logs: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -513,12 +522,8 @@ base: components: sinks: humio_logs: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -560,7 +565,7 @@ base: components: sinks: humio_logs: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/humio_metrics.cue b/website/cue/reference/components/sinks/base/humio_metrics.cue index 33a5fe17fbdb1..f41836314e6b3 100644 --- a/website/cue/reference/components/sinks/base/humio_metrics.cue +++ b/website/cue/reference/components/sinks/base/humio_metrics.cue @@ -255,6 +255,15 @@ base: components: sinks: humio_metrics: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -315,12 +324,8 @@ base: components: sinks: humio_metrics: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -362,7 +367,7 @@ base: components: sinks: humio_metrics: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/influxdb_logs.cue b/website/cue/reference/components/sinks/base/influxdb_logs.cue index 99fb4a62f06a0..7b817c559466c 100644 --- a/website/cue/reference/components/sinks/base/influxdb_logs.cue +++ b/website/cue/reference/components/sinks/base/influxdb_logs.cue @@ -232,6 +232,15 @@ base: components: sinks: influxdb_logs: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -292,12 +301,8 @@ base: components: sinks: influxdb_logs: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -339,7 +344,7 @@ base: components: sinks: influxdb_logs: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/influxdb_metrics.cue b/website/cue/reference/components/sinks/base/influxdb_metrics.cue index f09d1ab92fa4d..72f09ac1366d3 100644 --- a/website/cue/reference/components/sinks/base/influxdb_metrics.cue +++ b/website/cue/reference/components/sinks/base/influxdb_metrics.cue @@ -186,6 +186,15 @@ base: components: sinks: influxdb_metrics: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -246,12 +255,8 @@ base: components: sinks: influxdb_metrics: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -293,7 +298,7 @@ base: components: sinks: influxdb_metrics: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/logdna.cue b/website/cue/reference/components/sinks/base/logdna.cue index f703551f05607..3daaa54965547 100644 --- a/website/cue/reference/components/sinks/base/logdna.cue +++ b/website/cue/reference/components/sinks/base/logdna.cue @@ -195,6 +195,15 @@ base: components: sinks: logdna: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -255,12 +264,8 @@ base: components: sinks: logdna: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -302,7 +307,7 @@ base: components: sinks: logdna: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/loki.cue b/website/cue/reference/components/sinks/base/loki.cue index d7515db758f36..a51e528d88e1d 100644 --- a/website/cue/reference/components/sinks/base/loki.cue +++ b/website/cue/reference/components/sinks/base/loki.cue @@ -517,6 +517,15 @@ base: components: sinks: loki: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -577,12 +586,8 @@ base: components: sinks: loki: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -624,7 +629,7 @@ base: components: sinks: loki: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/mezmo.cue b/website/cue/reference/components/sinks/base/mezmo.cue index 6696c940d9616..8bba924aea1bc 100644 --- a/website/cue/reference/components/sinks/base/mezmo.cue +++ b/website/cue/reference/components/sinks/base/mezmo.cue @@ -195,6 +195,15 @@ base: components: sinks: mezmo: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -255,12 +264,8 @@ base: components: sinks: mezmo: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -302,7 +307,7 @@ base: components: sinks: mezmo: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/nats.cue b/website/cue/reference/components/sinks/base/nats.cue index f518541c0873f..3567dc4c98079 100644 --- a/website/cue/reference/components/sinks/base/nats.cue +++ b/website/cue/reference/components/sinks/base/nats.cue @@ -415,6 +415,15 @@ base: components: sinks: nats: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -441,7 +450,7 @@ base: components: sinks: nats: configuration: { required: false type: { string: { - default: "adaptive" + default: "none" enum: { adaptive: """ Concurrency will be managed by Vector's [Adaptive Request Concurrency][arc] feature. @@ -475,12 +484,8 @@ base: components: sinks: nats: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -522,7 +527,7 @@ base: components: sinks: nats: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/new_relic.cue b/website/cue/reference/components/sinks/base/new_relic.cue index 3eaff416c157b..86f75a1201553 100644 --- a/website/cue/reference/components/sinks/base/new_relic.cue +++ b/website/cue/reference/components/sinks/base/new_relic.cue @@ -206,6 +206,15 @@ base: components: sinks: new_relic: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -266,12 +275,8 @@ base: components: sinks: new_relic: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -313,7 +318,7 @@ base: components: sinks: new_relic: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/prometheus_remote_write.cue b/website/cue/reference/components/sinks/base/prometheus_remote_write.cue index 570cd97e7fdac..2ecd4d3899878 100644 --- a/website/cue/reference/components/sinks/base/prometheus_remote_write.cue +++ b/website/cue/reference/components/sinks/base/prometheus_remote_write.cue @@ -362,6 +362,15 @@ base: components: sinks: prometheus_remote_write: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -422,12 +431,8 @@ base: components: sinks: prometheus_remote_write: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -469,7 +474,7 @@ base: components: sinks: prometheus_remote_write: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/redis.cue b/website/cue/reference/components/sinks/base/redis.cue index bab8dc3f0b5ff..57d985b8416e1 100644 --- a/website/cue/reference/components/sinks/base/redis.cue +++ b/website/cue/reference/components/sinks/base/redis.cue @@ -408,6 +408,15 @@ base: components: sinks: redis: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -434,7 +443,7 @@ base: components: sinks: redis: configuration: { required: false type: { string: { - default: "adaptive" + default: "none" enum: { adaptive: """ Concurrency will be managed by Vector's [Adaptive Request Concurrency][arc] feature. @@ -468,12 +477,8 @@ base: components: sinks: redis: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -515,7 +520,7 @@ base: components: sinks: redis: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/sematext_logs.cue b/website/cue/reference/components/sinks/base/sematext_logs.cue index 380bbac33e7e7..5b24555d64c7f 100644 --- a/website/cue/reference/components/sinks/base/sematext_logs.cue +++ b/website/cue/reference/components/sinks/base/sematext_logs.cue @@ -162,6 +162,15 @@ base: components: sinks: sematext_logs: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -222,12 +231,8 @@ base: components: sinks: sematext_logs: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -269,7 +274,7 @@ base: components: sinks: sematext_logs: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/sematext_metrics.cue b/website/cue/reference/components/sinks/base/sematext_metrics.cue index 9e4d2647258f0..fbee85a150d51 100644 --- a/website/cue/reference/components/sinks/base/sematext_metrics.cue +++ b/website/cue/reference/components/sinks/base/sematext_metrics.cue @@ -144,6 +144,15 @@ base: components: sinks: sematext_metrics: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -204,12 +213,8 @@ base: components: sinks: sematext_metrics: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -251,7 +256,7 @@ base: components: sinks: sematext_metrics: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/splunk_hec_logs.cue b/website/cue/reference/components/sinks/base/splunk_hec_logs.cue index ba2cfd6ca3108..dd2921b1cca0d 100644 --- a/website/cue/reference/components/sinks/base/splunk_hec_logs.cue +++ b/website/cue/reference/components/sinks/base/splunk_hec_logs.cue @@ -505,6 +505,15 @@ base: components: sinks: splunk_hec_logs: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -565,12 +574,8 @@ base: components: sinks: splunk_hec_logs: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -612,7 +617,7 @@ base: components: sinks: splunk_hec_logs: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/splunk_hec_metrics.cue b/website/cue/reference/components/sinks/base/splunk_hec_metrics.cue index 591b0a1155b76..111c9ba088159 100644 --- a/website/cue/reference/components/sinks/base/splunk_hec_metrics.cue +++ b/website/cue/reference/components/sinks/base/splunk_hec_metrics.cue @@ -229,6 +229,15 @@ base: components: sinks: splunk_hec_metrics: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -289,12 +298,8 @@ base: components: sinks: splunk_hec_metrics: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -336,7 +341,7 @@ base: components: sinks: splunk_hec_metrics: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } } diff --git a/website/cue/reference/components/sinks/base/vector.cue b/website/cue/reference/components/sinks/base/vector.cue index 5ac22f2c719d6..49e4f9dcf8934 100644 --- a/website/cue/reference/components/sinks/base/vector.cue +++ b/website/cue/reference/components/sinks/base/vector.cue @@ -136,6 +136,15 @@ base: components: sinks: vector: configuration: { required: false type: uint: default: 1 } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit will not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } rtt_deviation_scale: { description: """ Scale of RTT deviations which are not considered anomalous. @@ -196,12 +205,8 @@ base: components: sinks: vector: configuration: { } } retry_attempts: { - description: """ - The maximum number of retries to make for failed requests. - - The default, for all intents and purposes, represents an infinite number of retries. - """ - required: false + description: "The maximum number of retries to make for failed requests." + required: false type: uint: { default: 9223372036854775807 unit: "retries" @@ -243,7 +248,7 @@ base: components: sinks: vector: configuration: { description: "The maximum amount of time to wait between retries." required: false type: uint: { - default: 3600 + default: 30 unit: "seconds" } }