diff --git a/src/app.rs b/src/app.rs index 75893718347e1..18ac0c1798571 100644 --- a/src/app.rs +++ b/src/app.rs @@ -468,14 +468,14 @@ pub fn build_runtime(threads: Option, thread_name: &str) -> Result AmqpSinkConfig { @@ -129,6 +129,9 @@ async fn amqp_happy_path() { } assert_eq!(out.len(), input.len()); + + let input: HashSet = HashSet::from_iter(input); + let out: HashSet = HashSet::from_iter(out); assert_eq!(out, input); } diff --git a/src/sinks/amqp/sink.rs b/src/sinks/amqp/sink.rs index 287b002b935f2..7039c6217993d 100644 --- a/src/sinks/amqp/sink.rs +++ b/src/sinks/amqp/sink.rs @@ -119,7 +119,7 @@ impl AmqpSink { input .filter_map(|event| std::future::ready(self.make_amqp_event(event))) - .request_builder(None, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(e) => { diff --git a/src/sinks/appsignal/sink.rs b/src/sinks/appsignal/sink.rs index b908ed757a45a..1ad0e3e0c8779 100644 --- a/src/sinks/appsignal/sink.rs +++ b/src/sinks/appsignal/sink.rs @@ -1,17 +1,6 @@ -use futures::{stream::BoxStream, StreamExt}; use futures_util::future::ready; -use tower::{Service, ServiceBuilder}; -use vector_core::{ - event::Event, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; -use crate::{ - codecs::Transformer, - internal_events::SinkRequestBuildError, - sinks::util::{buffer::metrics::MetricNormalizer, builder::SinkBuilderExt, Compression}, -}; +use crate::sinks::{prelude::*, util::buffer::metrics::MetricNormalizer}; use super::{ encoder::AppsignalEncoder, @@ -47,7 +36,7 @@ where }) .batched(self.batch_settings.into_byte_size_config()) .request_builder( - None, + default_request_builder_concurrency_limit(), AppsignalRequestBuilder { compression: self.compression, encoder: AppsignalEncoder { diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index c39911a0705bc..61a1e539418aa 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, fmt::Debug, marker::PhantomData, num::NonZeroUsize}; +use std::{borrow::Cow, fmt::Debug, marker::PhantomData}; use lookup::lookup_v2::ConfigValuePath; use rand::random; @@ -42,8 +42,6 @@ where R: Record + Send + Sync + Unpin + Clone + 'static, { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let request_builder_concurrency_limit = NonZeroUsize::new(50); - input .filter_map(|event| { // Panic: This sink only accepts Logs, so this should never panic @@ -52,7 +50,10 @@ where future::ready(processed) }) - .request_builder(request_builder_concurrency_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/aws_s_s/sink.rs b/src/sinks/aws_s_s/sink.rs index b8bfa16da936c..41eb42f82e9f6 100644 --- a/src/sinks/aws_s_s/sink.rs +++ b/src/sinks/aws_s_s/sink.rs @@ -1,18 +1,6 @@ -use std::num::NonZeroUsize; - -use futures::stream::BoxStream; -use futures_util::StreamExt; -use vector_core::sink::StreamSink; - use super::{client::Client, request_builder::SSRequestBuilder, service::SSService}; -use crate::internal_events::SinkRequestBuildError; use crate::sinks::aws_s_s::retry::SSRetryLogic; -use crate::{ - event::Event, - sinks::util::{ - builder::SinkBuilderExt, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig, - }, -}; +use crate::sinks::prelude::*; #[derive(Clone, Copy, Debug, Default)] pub(crate) struct SqsSinkDefaultBatchSettings; @@ -55,14 +43,16 @@ where let request = self .request .unwrap_with(&TowerRequestConfig::default().timeout_secs(30)); - let request_builder_concurrency_limit = NonZeroUsize::new(50); let retry_logic: SSRetryLogic = super::retry::SSRetryLogic::new(); let service = tower::ServiceBuilder::new() .settings(request, retry_logic) .service(self.service); input - .request_builder(request_builder_concurrency_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|req| async move { req.map_err(|error| { emit!(SinkRequestBuildError { error }); diff --git a/src/sinks/azure_common/sink.rs b/src/sinks/azure_common/sink.rs index c0655cc426d47..d46910f1e4d98 100644 --- a/src/sinks/azure_common/sink.rs +++ b/src/sinks/azure_common/sink.rs @@ -1,21 +1,6 @@ -use std::{fmt, num::NonZeroUsize}; +use std::fmt; -use async_trait::async_trait; -use futures::stream::BoxStream; -use futures_util::StreamExt; -use tower::Service; -use vector_common::request_metadata::MetaDescriptive; -use vector_core::{ - event::Finalizable, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; - -use crate::{ - event::Event, - internal_events::SinkRequestBuildError, - sinks::util::{partitioner::KeyPartitioner, RequestBuilder, SinkBuilderExt}, -}; +use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner}; pub struct AzureBlobSink { service: Svc, @@ -54,7 +39,6 @@ where let partitioner = self.partitioner; let settings = self.batcher_settings; - let builder_limit = NonZeroUsize::new(64); let request_builder = self.request_builder; input @@ -65,7 +49,7 @@ where // that occurs. key.map(move |k| (k, batch)) }) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/azure_monitor_logs/sink.rs b/src/sinks/azure_monitor_logs/sink.rs index 7b610cedebaa9..ce06e04bb83b8 100644 --- a/src/sinks/azure_monitor_logs/sink.rs +++ b/src/sinks/azure_monitor_logs/sink.rs @@ -41,7 +41,7 @@ where input .batched(self.batch_settings.into_byte_size_config()) .request_builder( - None, + default_request_builder_concurrency_limit(), AzureMonitorLogsRequestBuilder { encoding: self.encoding, }, diff --git a/src/sinks/clickhouse/sink.rs b/src/sinks/clickhouse/sink.rs index df2261f1d6738..c951101c8ce21 100644 --- a/src/sinks/clickhouse/sink.rs +++ b/src/sinks/clickhouse/sink.rs @@ -49,7 +49,7 @@ impl ClickhouseSink { ) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) .request_builder( - None, + default_request_builder_concurrency_limit(), ClickhouseRequestBuilder { compression: self.compression, encoding: self.encoding, diff --git a/src/sinks/databend/sink.rs b/src/sinks/databend/sink.rs index b0e67b9ea2662..87b3d1155cc9d 100644 --- a/src/sinks/databend/sink.rs +++ b/src/sinks/databend/sink.rs @@ -1,14 +1,4 @@ -use std::num::NonZeroUsize; - -use futures_util::{stream::BoxStream, StreamExt}; -use vector_core::event::Event; -use vector_core::sink::StreamSink; -use vector_core::stream::BatcherSettings; - -use crate::{ - internal_events::SinkRequestBuildError, - sinks::util::{service::Svc, SinkBuilderExt}, -}; +use crate::sinks::prelude::*; use super::request_builder::DatabendRequestBuilder; use super::service::{DatabendRetryLogic, DatabendService}; @@ -33,10 +23,12 @@ impl DatabendSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let builder_limit = NonZeroUsize::new(64); input .batched(self.batch_settings.into_byte_size_config()) - .request_builder(builder_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/datadog/events/sink.rs b/src/sinks/datadog/events/sink.rs index a85d23d829093..d917a44c51bce 100644 --- a/src/sinks/datadog/events/sink.rs +++ b/src/sinks/datadog/events/sink.rs @@ -1,17 +1,12 @@ -use std::{fmt, num::NonZeroUsize}; +use std::fmt; -use async_trait::async_trait; -use futures::{stream::BoxStream, StreamExt}; use lookup::event_path; -use tower::Service; -use vector_core::stream::DriverResponse; use crate::{ - event::Event, - internal_events::{ParserMissingFieldError, SinkRequestBuildError, DROP_EVENT}, + internal_events::{ParserMissingFieldError, DROP_EVENT}, sinks::{ datadog::events::request_builder::{DatadogEventsRequest, DatadogEventsRequestBuilder}, - util::{SinkBuilderExt, StreamSink}, + prelude::*, }, }; @@ -27,11 +22,12 @@ where S::Error: fmt::Debug + Into + Send, { async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let concurrency_limit = NonZeroUsize::new(50); - input .filter_map(ensure_required_fields) - .request_builder(concurrency_limit, DatadogEventsRequestBuilder::new()) + .request_builder( + default_request_builder_concurrency_limit(), + DatadogEventsRequestBuilder::new(), + ) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 33b823ad3b92c..363fa55383d2b 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -1,30 +1,14 @@ -use std::{fmt::Debug, io, num::NonZeroUsize, sync::Arc}; +use std::{fmt::Debug, io, sync::Arc}; -use async_trait::async_trait; use bytes::Bytes; use codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig}; -use futures::stream::{BoxStream, StreamExt}; use lookup::event_path; use snafu::Snafu; -use tower::Service; -use vector_common::request_metadata::{GroupedCountByteSize, RequestMetadata}; -use vector_core::{ - event::{Event, EventFinalizers, Finalizable, Value}, - partition::Partitioner, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest}; -use crate::{ - codecs::{Encoder, Transformer}, - internal_events::SinkRequestBuildError, - sinks::util::{ - encoding::{write_all, Encoder as _}, - metadata::RequestMetadataBuilder, - request_builder::EncodeResult, - Compression, Compressor, RequestBuilder, SinkBuilderExt, - }, +use crate::sinks::{ + prelude::*, + util::{encoding::Encoder as _, Compressor}, }; #[derive(Default)] struct EventPartitioner; @@ -278,11 +262,10 @@ where let partitioner = EventPartitioner; - let builder_limit = NonZeroUsize::new(64); let input = input.batched_partitioned(partitioner, self.batch_settings); input .request_builder( - builder_limit, + default_request_builder_concurrency_limit(), LogRequestBuilder { default_api_key, encoding: self.encoding, diff --git a/src/sinks/elasticsearch/sink.rs b/src/sinks/elasticsearch/sink.rs index 7800f4d975a91..a3168d5b44809 100644 --- a/src/sinks/elasticsearch/sink.rs +++ b/src/sinks/elasticsearch/sink.rs @@ -1,22 +1,15 @@ -use std::{fmt, num::NonZeroUsize}; +use std::fmt; -use async_trait::async_trait; -use futures::{future, stream::BoxStream, StreamExt}; use lookup::lookup_v2::ConfigValuePath; -use tower::Service; -use vector_core::stream::{BatcherSettings, DriverResponse}; use vrl::path::PathPrefix; use crate::{ - codecs::Transformer, - event::{Event, LogEvent, Value}, - internal_events::SinkRequestBuildError, sinks::{ elasticsearch::{ encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder, service::ElasticsearchRequest, BulkAction, ElasticsearchCommonMode, }, - util::{SinkBuilderExt, StreamSink}, + prelude::*, }, transforms::metric_to_log::MetricToLog, }; @@ -67,8 +60,6 @@ where S::Error: fmt::Debug + Into + Send, { pub async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let request_builder_concurrency_limit = NonZeroUsize::new(50); - let mode = self.mode; let id_key_field = self.id_key_field.as_ref(); let transformer = self.transformer.clone(); @@ -91,7 +82,10 @@ where future::ready(process_log(log, &mode, id_key_field, &transformer)) }) .batched(self.batch_settings.into_byte_size_config()) - .request_builder(request_builder_concurrency_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/gcp/stackdriver/logs/sink.rs b/src/sinks/gcp/stackdriver/logs/sink.rs index c15ed0f342ce6..39870bca24432 100644 --- a/src/sinks/gcp/stackdriver/logs/sink.rs +++ b/src/sinks/gcp/stackdriver/logs/sink.rs @@ -41,7 +41,10 @@ where .into_item_size_config(HttpJsonBatchSizer), ) // Build requests with no concurrency limit. - .request_builder(None, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) // Filter out any errors that occurred in the request building. .filter_map(|request| async move { match request { diff --git a/src/sinks/gcs_common/sink.rs b/src/sinks/gcs_common/sink.rs index 2f3991e5182d2..830dbd4ca6301 100644 --- a/src/sinks/gcs_common/sink.rs +++ b/src/sinks/gcs_common/sink.rs @@ -1,21 +1,6 @@ -use std::{fmt, num::NonZeroUsize}; +use std::fmt; -use async_trait::async_trait; -use futures::stream::BoxStream; -use futures_util::StreamExt; -use tower::Service; -use vector_common::request_metadata::MetaDescriptive; -use vector_core::{ - event::Finalizable, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; - -use crate::{ - event::Event, - internal_events::SinkRequestBuildError, - sinks::util::{partitioner::KeyPartitioner, RequestBuilder, SinkBuilderExt}, -}; +use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner}; pub struct GcsSink { service: Svc, @@ -57,7 +42,6 @@ where let partitioner = self.partitioner; let settings = self.batcher_settings; - let builder_limit = NonZeroUsize::new(64); let request_builder = self.request_builder; input @@ -67,7 +51,7 @@ where // thus no further `EventsDropped` event needs emitting at this stage. key.map(move |k| (k, batch)) }) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/honeycomb/sink.rs b/src/sinks/honeycomb/sink.rs index 9575577b91e11..235dcd6eb4893 100644 --- a/src/sinks/honeycomb/sink.rs +++ b/src/sinks/honeycomb/sink.rs @@ -40,8 +40,11 @@ where self.batch_settings .into_item_size_config(HttpJsonBatchSizer), ) - // Build requests with no concurrency limit. - .request_builder(None, self.request_builder) + // Build requests with default concurrency limit. + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) // Filter out any errors that occurred in the request building. .filter_map(|request| async move { match request { diff --git a/src/sinks/http/sink.rs b/src/sinks/http/sink.rs index ee997cd82ad33..8427201b31662 100644 --- a/src/sinks/http/sink.rs +++ b/src/sinks/http/sink.rs @@ -36,8 +36,11 @@ where .batched(self.batch_settings.into_item_size_config(HttpBatchSizer { encoder: self.request_builder.encoder.encoder.clone(), })) - // Build requests with no concurrency limit. - .request_builder(None, self.request_builder) + // Build requests with default concurrency limit. + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) // Filter out any errors that occurred in the request building. .filter_map(|request| async move { match request { diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index 8a981a987c79a..141c32f7cb3b7 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -1,5 +1,3 @@ -use std::num::NonZeroUsize; - use rdkafka::{ consumer::{BaseConsumer, Consumer}, error::KafkaError, @@ -68,7 +66,6 @@ impl KafkaSink { // 64 should be plenty concurrency here, as a rdkafka send operation does not block until its underlying // buffer is full. let service = ConcurrencyLimit::new(self.service.clone(), 64); - let builder_limit = NonZeroUsize::new(64); let request_builder = KafkaRequestBuilder { key_field: self.key_field, @@ -93,7 +90,7 @@ impl KafkaSink { .map(|topic| (topic, event)), ) }) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async { match request { Err(error) => { @@ -105,6 +102,7 @@ impl KafkaSink { }) .into_driver(service) .protocol("kafka") + .protocol("kafka") .run() .await } diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index 0d9cb9fbb43e1..89ae9857ee746 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -446,7 +446,7 @@ impl LokiSink { // out_of_order_action's that require a complete ordering are limited to building 1 request // at a time let request_builder_concurrency = match self.out_of_order_action { - OutOfOrderAction::Accept => NonZeroUsize::new(50).expect("static"), + OutOfOrderAction::Accept => default_request_builder_concurrency_limit(), OutOfOrderAction::Drop | OutOfOrderAction::RewriteTimestamp => { NonZeroUsize::new(1).expect("static") } @@ -479,7 +479,7 @@ impl LokiSink { None } }) - .request_builder(Some(request_builder_concurrency), self.request_builder) + .request_builder(request_builder_concurrency, self.request_builder) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/nats/sink.rs b/src/sinks/nats/sink.rs index 97fc747dc2f89..f6bea42919e9d 100644 --- a/src/sinks/nats/sink.rs +++ b/src/sinks/nats/sink.rs @@ -79,7 +79,7 @@ impl NatsSink { input .filter_map(|event| std::future::ready(self.make_nats_event(event))) - .request_builder(None, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(e) => { diff --git a/src/sinks/new_relic/sink.rs b/src/sinks/new_relic/sink.rs index 384a1b7a54e52..fd93c90bbd61b 100644 --- a/src/sinks/new_relic/sink.rs +++ b/src/sinks/new_relic/sink.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, num::NonZeroUsize, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; use bytes::Bytes; @@ -115,7 +115,6 @@ where S::Error: Debug + Into + Send, { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let builder_limit = NonZeroUsize::new(64); let request_builder = NewRelicRequestBuilder { encoder: self.encoder, compression: self.compression, @@ -125,7 +124,7 @@ where input .batched(self.batcher_settings.into_byte_size_config()) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map( |request: Result| async move { match request { diff --git a/src/sinks/opendal_common.rs b/src/sinks/opendal_common.rs index f8e5877b8e1ed..3ff05f214f1c7 100644 --- a/src/sinks/opendal_common.rs +++ b/src/sinks/opendal_common.rs @@ -8,38 +8,15 @@ //! - Error handling //! - Limitation -use std::{fmt, num::NonZeroUsize, task::Poll}; +use std::{fmt, task::Poll}; use bytes::Bytes; use codecs::encoding::Framer; -use futures::{stream::BoxStream, StreamExt}; use opendal::Operator; use snafu::Snafu; -use tower::Service; use tracing::Instrument; -use vector_common::{ - finalization::{EventStatus, Finalizable}, - json_size::JsonSize, - request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, -}; -use vector_core::{ - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, - EstimatedJsonEncodedSizeOf, -}; - -use crate::{ - codecs::{Encoder, Transformer}, - event::{Event, EventFinalizers}, - internal_events::SinkRequestBuildError, - sinks::{ - util::{ - metadata::RequestMetadataBuilder, partitioner::KeyPartitioner, - request_builder::EncodeResult, Compression, RequestBuilder, SinkBuilderExt, - }, - BoxFuture, - }, -}; + +use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner}; /// OpenDalSink provides generic a service upon OpenDAL. /// @@ -98,7 +75,6 @@ where let partitioner = self.partitioner; let settings = self.batcher_settings; - let builder_limit = NonZeroUsize::new(64); let request_builder = self.request_builder; input @@ -109,7 +85,7 @@ where // that occurs. key.map(move |k| (k, batch)) }) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/prelude.rs b/src/sinks/prelude.rs index ecc62097aaef2..d118545537604 100644 --- a/src/sinks/prelude.rs +++ b/src/sinks/prelude.rs @@ -16,7 +16,7 @@ pub use crate::{ builder::SinkBuilderExt, encoding::{self, write_all}, metadata::RequestMetadataBuilder, - request_builder::EncodeResult, + request_builder::{default_request_builder_concurrency_limit, EncodeResult}, retries::{RetryAction, RetryLogic}, service::{ServiceBuilderExt, Svc}, BatchConfig, Compression, Concurrency, NoDefaultsBatchSettings, RequestBuilder, diff --git a/src/sinks/pulsar/sink.rs b/src/sinks/pulsar/sink.rs index 8644aa561dc58..7bd26ac24f5a8 100644 --- a/src/sinks/pulsar/sink.rs +++ b/src/sinks/pulsar/sink.rs @@ -112,7 +112,7 @@ impl PulsarSink { event, )) }) - .request_builder(None, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { request .map_err(|e| error!("Failed to build Pulsar request: {:?}.", e)) diff --git a/src/sinks/s3_common/sink.rs b/src/sinks/s3_common/sink.rs index 08e33674a02ad..c26f6f93e8a0f 100644 --- a/src/sinks/s3_common/sink.rs +++ b/src/sinks/s3_common/sink.rs @@ -1,21 +1,6 @@ -use std::{fmt, num::NonZeroUsize}; +use std::fmt; -use async_trait::async_trait; -use futures::stream::BoxStream; -use futures_util::StreamExt; -use tower::Service; -use vector_common::request_metadata::MetaDescriptive; -use vector_core::{ - event::Finalizable, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; - -use crate::internal_events::SinkRequestBuildError; -use crate::{ - event::Event, - sinks::util::{RequestBuilder, SinkBuilderExt}, -}; +use crate::sinks::prelude::*; use super::partitioner::{S3KeyPartitioner, S3PartitionKey}; @@ -56,13 +41,12 @@ where let partitioner = self.partitioner; let settings = self.batcher_settings; - let builder_limit = NonZeroUsize::new(64); let request_builder = self.request_builder; input .batched_partitioned(partitioner, settings) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index 2533add3438a2..555a3d9b95076 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -1,32 +1,19 @@ -use std::{fmt, num::NonZeroUsize, sync::Arc}; +use std::{fmt, sync::Arc}; -use async_trait::async_trait; -use futures_util::{stream::BoxStream, StreamExt}; use serde::Serialize; -use tower::Service; -use vector_buffers::EventCount; -use vector_core::{ - event::{Event, LogEvent, Value}, - partition::Partitioner, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, - ByteSizeOf, -}; use super::request_builder::HecLogsRequestBuilder; use crate::{ - config::SinkContext, internal_events::SplunkEventTimestampInvalidType, internal_events::SplunkEventTimestampMissing, - internal_events::TemplateRenderingError, sinks::{ + prelude::*, splunk_hec::common::{ render_template_string, request::HecRequest, EndpointTarget, INDEX_FIELD, SOURCETYPE_FIELD, SOURCE_FIELD, }, - util::{processed_event::ProcessedEvent, SinkBuilderExt}, + util::processed_event::ProcessedEvent, }, - template::Template, }; use lookup::{event_path, OwnedValuePath, PathPrefix}; @@ -64,8 +51,6 @@ where S::Error: fmt::Debug + Into + Send, { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let builder_limit = NonZeroUsize::new(64); - let data = HecLogData { sourcetype: self.sourcetype.as_ref(), source: self.source.as_ref(), @@ -94,7 +79,10 @@ where }, self.batch_settings, ) - .request_builder(builder_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|request| async move { match request { Err(e) => { diff --git a/src/sinks/splunk_hec/metrics/sink.rs b/src/sinks/splunk_hec/metrics/sink.rs index f41e397a7acac..ef42d21b60370 100644 --- a/src/sinks/splunk_hec/metrics/sink.rs +++ b/src/sinks/splunk_hec/metrics/sink.rs @@ -1,28 +1,17 @@ -use std::{fmt, num::NonZeroUsize, sync::Arc}; +use std::{fmt, sync::Arc}; -use async_trait::async_trait; -use futures_util::{future, stream::BoxStream, StreamExt}; use serde::Serialize; -use tower::Service; -use vector_buffers::EventCount; -use vector_core::{ - event::{Event, Metric, MetricValue}, - partition::Partitioner, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, - ByteSizeOf, -}; +use vector_core::event::{Metric, MetricValue}; use vrl::path::OwnedValuePath; use super::request_builder::HecMetricsRequestBuilder; use crate::{ - config::SinkContext, internal_events::SplunkInvalidMetricReceivedError, sinks::{ + prelude::*, splunk_hec::common::{render_template_string, request::HecRequest}, - util::{encode_namespace, processed_event::ProcessedEvent, SinkBuilderExt}, + util::{encode_namespace, processed_event::ProcessedEvent}, }, - template::Template, }; pub struct HecMetricsSink { @@ -51,7 +40,6 @@ where let host_key = self.host_key.as_ref(); let default_namespace = self.default_namespace.as_deref(); - let builder_limit = NonZeroUsize::new(64); input .map(|event| (event.size_of(), event.into_metric())) .filter_map(move |(event_byte_size, metric)| { @@ -66,7 +54,10 @@ where )) }) .batched_partitioned(EventPartitioner, self.batch_settings) - .request_builder(builder_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|request| async move { match request { Err(e) => { diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index 0d7af1635a4b7..c51bf405dc4d1 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -94,16 +94,15 @@ pub trait SinkBuilderExt: Stream { /// Constructs a [`Stream`] which transforms the input into a request suitable for sending to /// downstream services. /// - /// Each input is transformed concurrently, up to the given limit. A limit of `None` is - /// self-describing, as it imposes no concurrency limit, and `Some(n)` limits this stage to `n` - /// concurrent operations at any given time. + /// Each input is transformed concurrently, up to the given limit. A limit of `n` limits + /// this stage to `n` concurrent operations at any given time. /// /// Encoding and compression are handled internally, deferring to the builder at the necessary /// checkpoints for adjusting the event before encoding/compression, as well as generating the /// correct request object with the result of encoding/compressing the events. fn request_builder( self, - limit: Option, + limit: NonZeroUsize, builder: B, ) -> ConcurrentMap> where @@ -115,7 +114,7 @@ pub trait SinkBuilderExt: Stream { { let builder = Arc::new(builder); - self.concurrent_map(limit, move |input| { + self.concurrent_map(Some(limit), move |input| { let builder = Arc::clone(&builder); Box::pin(async move { diff --git a/src/sinks/util/request_builder.rs b/src/sinks/util/request_builder.rs index 86501f3a96a04..5ee5ea62423b3 100644 --- a/src/sinks/util/request_builder.rs +++ b/src/sinks/util/request_builder.rs @@ -1,10 +1,24 @@ -use std::io; +use std::{io, num::NonZeroUsize}; use bytes::Bytes; use vector_common::request_metadata::{GroupedCountByteSize, RequestMetadata}; use super::{encoding::Encoder, metadata::RequestMetadataBuilder, Compression, Compressor}; +pub fn default_request_builder_concurrency_limit() -> NonZeroUsize { + if let Some(limit) = std::env::var("VECTOR_EXPERIMENTAL_REQUEST_BUILDER_CONCURRENCY") + .map(|value| value.parse::().ok()) + .ok() + .flatten() + { + return limit; + } + + crate::app::WORKER_THREADS + .get() + .unwrap_or_else(|| NonZeroUsize::new(8).expect("static")) +} + pub struct EncodeResult

{ pub payload: P, pub uncompressed_byte_size: usize,