From 4e8de9c182e7af880b1a5a6b780b281e970b185a Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Thu, 21 Sep 2023 13:50:26 -0400 Subject: [PATCH 01/10] fix(kafka sink): performance improvements and fix memory leak --- src/sinks/kafka/config.rs | 124 ++++++++++++++--------------- src/sinks/kafka/request_builder.rs | 77 +++++++++--------- src/sinks/kafka/service.rs | 24 +++--- src/sinks/kafka/sink.rs | 52 ++++++++---- 4 files changed, 146 insertions(+), 131 deletions(-) diff --git a/src/sinks/kafka/config.rs b/src/sinks/kafka/config.rs index 692718b9096de..487b572d73367 100644 --- a/src/sinks/kafka/config.rs +++ b/src/sinks/kafka/config.rs @@ -17,8 +17,6 @@ use crate::{ }, }; -pub(crate) const QUEUED_MIN_MESSAGES: u64 = 100000; - /// Configuration for the `kafka` sink. #[serde_as] #[configurable_component(sink( @@ -159,79 +157,73 @@ impl KafkaSinkConfig { self.auth.apply(&mut client_config)?; - match kafka_role { - // All batch options are producer only. - KafkaRole::Producer => { - client_config - .set("compression.codec", &to_string(self.compression)) - .set( - "message.timeout.ms", - &self.message_timeout_ms.as_millis().to_string(), - ); - - if let Some(value) = self.batch.timeout_secs { - // Delay in milliseconds to wait for messages in the producer queue to accumulate before - // constructing message batches (MessageSets) to transmit to brokers. A higher value - // allows larger and more effective (less overhead, improved compression) batches of - // messages to accumulate at the expense of increased message delivery latency. - // Type: float - let key = "queue.buffering.max.ms"; - if let Some(val) = self.librdkafka_options.get(key) { - return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{}={}`.\ + // All batch options are producer only. + if kafka_role == KafkaRole::Producer { + client_config + .set("compression.codec", &to_string(self.compression)) + .set( + "message.timeout.ms", + &self.message_timeout_ms.as_millis().to_string(), + ); + + if let Some(value) = self.batch.timeout_secs { + // Delay in milliseconds to wait for messages in the producer queue to accumulate before + // constructing message batches (MessageSets) to transmit to brokers. A higher value + // allows larger and more effective (less overhead, improved compression) batches of + // messages to accumulate at the expense of increased message delivery latency. + // Type: float + let key = "queue.buffering.max.ms"; + if let Some(val) = self.librdkafka_options.get(key) { + return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{}={}`.\ The config already sets this as `librdkafka_options.queue.buffering.max.ms={}`.\ Please delete one.", key, value, val).into()); - } - debug!( - librdkafka_option = key, - batch_option = "timeout_secs", - value, - "Applying batch option as librdkafka option." - ); - client_config.set(key, &((value * 1000.0).round().to_string())); } - if let Some(value) = self.batch.max_events { - // Maximum number of messages batched in one MessageSet. The total MessageSet size is - // also limited by batch.size and message.max.bytes. - // Type: integer - let key = "batch.num.messages"; - if let Some(val) = self.librdkafka_options.get(key) { - return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{}={}`.\ + debug!( + librdkafka_option = key, + batch_option = "timeout_secs", + value, + "Applying batch option as librdkafka option." + ); + client_config.set(key, &((value * 1000.0).round().to_string())); + } + if let Some(value) = self.batch.max_events { + // Maximum number of messages batched in one MessageSet. The total MessageSet size is + // also limited by batch.size and message.max.bytes. + // Type: integer + let key = "batch.num.messages"; + if let Some(val) = self.librdkafka_options.get(key) { + return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{}={}`.\ The config already sets this as `librdkafka_options.batch.num.messages={}`.\ Please delete one.", key, value, val).into()); - } - debug!( - librdkafka_option = key, - batch_option = "max_events", - value, - "Applying batch option as librdkafka option." - ); - client_config.set(key, &value.to_string()); } - if let Some(value) = self.batch.max_bytes { - // Maximum size (in bytes) of all messages batched in one MessageSet, including protocol - // framing overhead. This limit is applied after the first message has been added to the - // batch, regardless of the first message's size, this is to ensure that messages that - // exceed batch.size are produced. The total MessageSet size is also limited by - // batch.num.messages and message.max.bytes. - // Type: integer - let key = "batch.size"; - if let Some(val) = self.librdkafka_options.get(key) { - return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{}={}`.\ + debug!( + librdkafka_option = key, + batch_option = "max_events", + value, + "Applying batch option as librdkafka option." + ); + client_config.set(key, &value.to_string()); + } + if let Some(value) = self.batch.max_bytes { + // Maximum size (in bytes) of all messages batched in one MessageSet, including protocol + // framing overhead. This limit is applied after the first message has been added to the + // batch, regardless of the first message's size, this is to ensure that messages that + // exceed batch.size are produced. The total MessageSet size is also limited by + // batch.num.messages and message.max.bytes. + // Type: integer + let key = "batch.size"; + if let Some(val) = self.librdkafka_options.get(key) { + return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{}={}`.\ The config already sets this as `librdkafka_options.batch.size={}`.\ Please delete one.", key, value, val).into()); - } - debug!( - librdkafka_option = key, - batch_option = "max_bytes", - value, - "Applying batch option as librdkafka option." - ); - client_config.set(key, &value.to_string()); } - } - - KafkaRole::Consumer => { - client_config.set("queued.min.messages", QUEUED_MIN_MESSAGES.to_string()); + debug!( + librdkafka_option = key, + batch_option = "max_bytes", + value, + "Applying batch option as librdkafka option." + ); + client_config.set(key, &value.to_string()); } } diff --git a/src/sinks/kafka/request_builder.rs b/src/sinks/kafka/request_builder.rs index 3a8ce886ad085..7a7693a35b032 100644 --- a/src/sinks/kafka/request_builder.rs +++ b/src/sinks/kafka/request_builder.rs @@ -1,42 +1,43 @@ -use std::num::NonZeroUsize; - -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; +use lookup::OwnedTargetPath; use rdkafka::message::{Header, OwnedHeaders}; -use tokio_util::codec::Encoder as _; -use vrl::path::OwnedTargetPath; use crate::{ - codecs::{Encoder, Transformer}, - event::{Event, Finalizable, Value}, - internal_events::{KafkaHeaderExtractionError, TemplateRenderingError}, + internal_events::KafkaHeaderExtractionError, sinks::{ kafka::service::{KafkaRequest, KafkaRequestMetadata}, - util::metadata::RequestMetadataBuilder, + prelude::*, }, - template::Template, }; pub struct KafkaRequestBuilder { pub key_field: Option, pub headers_key: Option, - pub topic_template: Template, - pub transformer: Transformer, - pub encoder: Encoder<()>, + pub encoder: (Transformer, Encoder<()>), } -impl KafkaRequestBuilder { - pub fn build_request(&mut self, mut event: Event) -> Option { - let topic = self - .topic_template - .render_string(&event) - .map_err(|error| { - emit!(TemplateRenderingError { - field: None, - drop_event: true, - error, - }); - }) - .ok()?; +impl RequestBuilder<(String, Event)> for KafkaRequestBuilder { + type Metadata = KafkaRequestMetadata; + type Events = Event; + type Encoder = (Transformer, Encoder<()>); + type Payload = Bytes; + type Request = KafkaRequest; + type Error = std::io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + input: (String, Event), + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let (topic, mut event) = input; + let builder = RequestMetadataBuilder::from_event(&event); let metadata = KafkaRequestMetadata { finalizers: event.take_finalizers(), @@ -45,23 +46,21 @@ impl KafkaRequestBuilder { headers: get_headers(&event, self.headers_key.as_ref()), topic, }; - self.transformer.transform(&mut event); - let mut body = BytesMut::new(); - - // Ensure the metadata builder is built after transforming the event so we have the event - // size taking into account any dropped fields. - let metadata_builder = RequestMetadataBuilder::from_event(&event); - self.encoder.encode(event, &mut body).ok()?; - let body = body.freeze(); - let bytes_len = NonZeroUsize::new(body.len()).expect("payload should never be zero length"); - let request_metadata = metadata_builder.with_request_size(bytes_len); + (metadata, builder, event) + } - Some(KafkaRequest { - body, + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + KafkaRequest { + body: payload.into_payload(), metadata, request_metadata, - }) + } } } diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 299a9de4ee056..5dcdad7cb171c 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -7,9 +7,6 @@ use rdkafka::{ producer::{FutureProducer, FutureRecord}, util::Timeout, }; -use vector_core::internal_event::{ - ByteSize, BytesSent, InternalEventHandle as _, Protocol, Registered, -}; use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*}; @@ -29,6 +26,7 @@ pub struct KafkaRequestMetadata { pub struct KafkaResponse { event_byte_size: GroupedCountByteSize, + raw_byte_size: usize, } impl DriverResponse for KafkaResponse { @@ -39,6 +37,10 @@ impl DriverResponse for KafkaResponse { fn events_sent(&self) -> &GroupedCountByteSize { &self.event_byte_size } + + fn bytes_sent(&self) -> Option { + Some(self.raw_byte_size) + } } impl Finalizable for KafkaRequest { @@ -60,15 +62,11 @@ impl MetaDescriptive for KafkaRequest { #[derive(Clone)] pub struct KafkaService { kafka_producer: FutureProducer, - bytes_sent: Registered, } impl KafkaService { pub(crate) fn new(kafka_producer: FutureProducer) -> KafkaService { - KafkaService { - kafka_producer, - bytes_sent: register!(BytesSent::from(Protocol("kafka".into()))), - } + KafkaService { kafka_producer } } } @@ -104,10 +102,12 @@ impl Service for KafkaService { // rdkafka will internally retry forever if the queue is full match this.kafka_producer.send(record, Timeout::Never).await { Ok((_partition, _offset)) => { - this.bytes_sent.emit(ByteSize( - request.body.len() + request.metadata.key.map(|x| x.len()).unwrap_or(0), - )); - Ok(KafkaResponse { event_byte_size }) + let raw_byte_size = + request.body.len() + request.metadata.key.map(|x| x.len()).unwrap_or(0); + Ok(KafkaResponse { + event_byte_size, + raw_byte_size, + }) } Err((kafka_err, _original_record)) => Err(kafka_err), } diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index f2d0107524310..8a981a987c79a 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -1,4 +1,5 @@ -use futures::future; +use std::num::NonZeroUsize; + use rdkafka::{ consumer::{BaseConsumer, Consumer}, error::KafkaError, @@ -13,9 +14,7 @@ use vrl::path::OwnedTargetPath; use super::config::{KafkaRole, KafkaSinkConfig}; use crate::{ kafka::KafkaStatisticsContext, - sinks::kafka::{ - config::QUEUED_MIN_MESSAGES, request_builder::KafkaRequestBuilder, service::KafkaService, - }, + sinks::kafka::{request_builder::KafkaRequestBuilder, service::KafkaService}, sinks::prelude::*, }; @@ -65,22 +64,47 @@ impl KafkaSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - // rdkafka will internally retry forever, so we need some limit to prevent this from overflowing - let service = ConcurrencyLimit::new(self.service, QUEUED_MIN_MESSAGES as usize); - let mut request_builder = KafkaRequestBuilder { + // rdkafka will internally retry forever, so we need some limit to prevent this from overflowing. + // 64 should be plenty concurrency here, as a rdkafka send operation does not block until its underlying + // buffer is full. + let service = ConcurrencyLimit::new(self.service.clone(), 64); + let builder_limit = NonZeroUsize::new(64); + + let request_builder = KafkaRequestBuilder { key_field: self.key_field, headers_key: self.headers_key, - topic_template: self.topic, - transformer: self.transformer, - encoder: self.encoder, + encoder: (self.transformer, self.encoder), }; input - .filter_map(|event| - // request_builder is fallible but the places it can fail are emitting - // `Error` and `DroppedEvent` internal events appropriately so no need to here. - future::ready(request_builder.build_request(event))) + .filter_map(|event| { + // Compute the topic. + future::ready( + self.topic + .render_string(&event) + .map_err(|error| { + emit!(TemplateRenderingError { + field: None, + drop_event: true, + error, + }); + }) + .ok() + .map(|topic| (topic, event)), + ) + }) + .request_builder(builder_limit, request_builder) + .filter_map(|request| async { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) .into_driver(service) + .protocol("kafka") .run() .await } From 134fb9be270c6a0f302d8719a89c2148cbe991f7 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Thu, 21 Sep 2023 14:48:56 -0400 Subject: [PATCH 02/10] clippy --- src/sinks/kafka/service.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 5dcdad7cb171c..9211c316af6e7 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -65,7 +65,9 @@ pub struct KafkaService { } impl KafkaService { - pub(crate) fn new(kafka_producer: FutureProducer) -> KafkaService { + pub(crate) const fn new( + kafka_producer: FutureProducer, + ) -> KafkaService { KafkaService { kafka_producer } } } From 4416c6b3ced39481c9f44094653d78f3a029d8fe Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Thu, 21 Sep 2023 15:57:17 -0400 Subject: [PATCH 03/10] fix(sinks): always set a request builder concurrency limit --- src/sinks/amqp/sink.rs | 2 +- src/sinks/appsignal/sink.rs | 15 ++---------- src/sinks/aws_kinesis/sink.rs | 9 ++++---- src/sinks/aws_s_s/sink.rs | 20 ++++------------ src/sinks/azure_common/sink.rs | 22 +++--------------- src/sinks/azure_monitor_logs/sink.rs | 2 +- src/sinks/clickhouse/sink.rs | 2 +- src/sinks/databend/sink.rs | 18 ++++----------- src/sinks/datadog/events/sink.rs | 18 ++++++--------- src/sinks/datadog/logs/sink.rs | 27 ++++------------------ src/sinks/elasticsearch/sink.rs | 18 +++++---------- src/sinks/gcp/stackdriver/logs/sink.rs | 5 +++- src/sinks/gcs_common/sink.rs | 22 +++--------------- src/sinks/honeycomb/sink.rs | 4 ++-- src/sinks/http/sink.rs | 4 ++-- src/sinks/kafka/sink.rs | 5 +--- src/sinks/loki/sink.rs | 4 ++-- src/sinks/nats/sink.rs | 2 +- src/sinks/new_relic/sink.rs | 5 ++-- src/sinks/opendal_common.rs | 32 ++++---------------------- src/sinks/prelude.rs | 2 +- src/sinks/pulsar/sink.rs | 2 +- src/sinks/s3_common/sink.rs | 22 +++--------------- src/sinks/splunk_hec/logs/sink.rs | 26 ++++++--------------- src/sinks/splunk_hec/metrics/sink.rs | 25 +++++++------------- src/sinks/util/builder.rs | 9 ++++---- src/sinks/util/request_builder.rs | 10 +++++++- 27 files changed, 95 insertions(+), 237 deletions(-) diff --git a/src/sinks/amqp/sink.rs b/src/sinks/amqp/sink.rs index 287b002b935f2..7039c6217993d 100644 --- a/src/sinks/amqp/sink.rs +++ b/src/sinks/amqp/sink.rs @@ -119,7 +119,7 @@ impl AmqpSink { input .filter_map(|event| std::future::ready(self.make_amqp_event(event))) - .request_builder(None, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(e) => { diff --git a/src/sinks/appsignal/sink.rs b/src/sinks/appsignal/sink.rs index b908ed757a45a..1ad0e3e0c8779 100644 --- a/src/sinks/appsignal/sink.rs +++ b/src/sinks/appsignal/sink.rs @@ -1,17 +1,6 @@ -use futures::{stream::BoxStream, StreamExt}; use futures_util::future::ready; -use tower::{Service, ServiceBuilder}; -use vector_core::{ - event::Event, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; -use crate::{ - codecs::Transformer, - internal_events::SinkRequestBuildError, - sinks::util::{buffer::metrics::MetricNormalizer, builder::SinkBuilderExt, Compression}, -}; +use crate::sinks::{prelude::*, util::buffer::metrics::MetricNormalizer}; use super::{ encoder::AppsignalEncoder, @@ -47,7 +36,7 @@ where }) .batched(self.batch_settings.into_byte_size_config()) .request_builder( - None, + default_request_builder_concurrency_limit(), AppsignalRequestBuilder { compression: self.compression, encoder: AppsignalEncoder { diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index c39911a0705bc..61a1e539418aa 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -1,4 +1,4 @@ -use std::{borrow::Cow, fmt::Debug, marker::PhantomData, num::NonZeroUsize}; +use std::{borrow::Cow, fmt::Debug, marker::PhantomData}; use lookup::lookup_v2::ConfigValuePath; use rand::random; @@ -42,8 +42,6 @@ where R: Record + Send + Sync + Unpin + Clone + 'static, { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let request_builder_concurrency_limit = NonZeroUsize::new(50); - input .filter_map(|event| { // Panic: This sink only accepts Logs, so this should never panic @@ -52,7 +50,10 @@ where future::ready(processed) }) - .request_builder(request_builder_concurrency_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/aws_s_s/sink.rs b/src/sinks/aws_s_s/sink.rs index b8bfa16da936c..41eb42f82e9f6 100644 --- a/src/sinks/aws_s_s/sink.rs +++ b/src/sinks/aws_s_s/sink.rs @@ -1,18 +1,6 @@ -use std::num::NonZeroUsize; - -use futures::stream::BoxStream; -use futures_util::StreamExt; -use vector_core::sink::StreamSink; - use super::{client::Client, request_builder::SSRequestBuilder, service::SSService}; -use crate::internal_events::SinkRequestBuildError; use crate::sinks::aws_s_s::retry::SSRetryLogic; -use crate::{ - event::Event, - sinks::util::{ - builder::SinkBuilderExt, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig, - }, -}; +use crate::sinks::prelude::*; #[derive(Clone, Copy, Debug, Default)] pub(crate) struct SqsSinkDefaultBatchSettings; @@ -55,14 +43,16 @@ where let request = self .request .unwrap_with(&TowerRequestConfig::default().timeout_secs(30)); - let request_builder_concurrency_limit = NonZeroUsize::new(50); let retry_logic: SSRetryLogic = super::retry::SSRetryLogic::new(); let service = tower::ServiceBuilder::new() .settings(request, retry_logic) .service(self.service); input - .request_builder(request_builder_concurrency_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|req| async move { req.map_err(|error| { emit!(SinkRequestBuildError { error }); diff --git a/src/sinks/azure_common/sink.rs b/src/sinks/azure_common/sink.rs index c0655cc426d47..d46910f1e4d98 100644 --- a/src/sinks/azure_common/sink.rs +++ b/src/sinks/azure_common/sink.rs @@ -1,21 +1,6 @@ -use std::{fmt, num::NonZeroUsize}; +use std::fmt; -use async_trait::async_trait; -use futures::stream::BoxStream; -use futures_util::StreamExt; -use tower::Service; -use vector_common::request_metadata::MetaDescriptive; -use vector_core::{ - event::Finalizable, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; - -use crate::{ - event::Event, - internal_events::SinkRequestBuildError, - sinks::util::{partitioner::KeyPartitioner, RequestBuilder, SinkBuilderExt}, -}; +use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner}; pub struct AzureBlobSink { service: Svc, @@ -54,7 +39,6 @@ where let partitioner = self.partitioner; let settings = self.batcher_settings; - let builder_limit = NonZeroUsize::new(64); let request_builder = self.request_builder; input @@ -65,7 +49,7 @@ where // that occurs. key.map(move |k| (k, batch)) }) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/azure_monitor_logs/sink.rs b/src/sinks/azure_monitor_logs/sink.rs index 7b610cedebaa9..ce06e04bb83b8 100644 --- a/src/sinks/azure_monitor_logs/sink.rs +++ b/src/sinks/azure_monitor_logs/sink.rs @@ -41,7 +41,7 @@ where input .batched(self.batch_settings.into_byte_size_config()) .request_builder( - None, + default_request_builder_concurrency_limit(), AzureMonitorLogsRequestBuilder { encoding: self.encoding, }, diff --git a/src/sinks/clickhouse/sink.rs b/src/sinks/clickhouse/sink.rs index df2261f1d6738..c951101c8ce21 100644 --- a/src/sinks/clickhouse/sink.rs +++ b/src/sinks/clickhouse/sink.rs @@ -49,7 +49,7 @@ impl ClickhouseSink { ) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) .request_builder( - None, + default_request_builder_concurrency_limit(), ClickhouseRequestBuilder { compression: self.compression, encoding: self.encoding, diff --git a/src/sinks/databend/sink.rs b/src/sinks/databend/sink.rs index b0e67b9ea2662..87b3d1155cc9d 100644 --- a/src/sinks/databend/sink.rs +++ b/src/sinks/databend/sink.rs @@ -1,14 +1,4 @@ -use std::num::NonZeroUsize; - -use futures_util::{stream::BoxStream, StreamExt}; -use vector_core::event::Event; -use vector_core::sink::StreamSink; -use vector_core::stream::BatcherSettings; - -use crate::{ - internal_events::SinkRequestBuildError, - sinks::util::{service::Svc, SinkBuilderExt}, -}; +use crate::sinks::prelude::*; use super::request_builder::DatabendRequestBuilder; use super::service::{DatabendRetryLogic, DatabendService}; @@ -33,10 +23,12 @@ impl DatabendSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let builder_limit = NonZeroUsize::new(64); input .batched(self.batch_settings.into_byte_size_config()) - .request_builder(builder_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/datadog/events/sink.rs b/src/sinks/datadog/events/sink.rs index a85d23d829093..d917a44c51bce 100644 --- a/src/sinks/datadog/events/sink.rs +++ b/src/sinks/datadog/events/sink.rs @@ -1,17 +1,12 @@ -use std::{fmt, num::NonZeroUsize}; +use std::fmt; -use async_trait::async_trait; -use futures::{stream::BoxStream, StreamExt}; use lookup::event_path; -use tower::Service; -use vector_core::stream::DriverResponse; use crate::{ - event::Event, - internal_events::{ParserMissingFieldError, SinkRequestBuildError, DROP_EVENT}, + internal_events::{ParserMissingFieldError, DROP_EVENT}, sinks::{ datadog::events::request_builder::{DatadogEventsRequest, DatadogEventsRequestBuilder}, - util::{SinkBuilderExt, StreamSink}, + prelude::*, }, }; @@ -27,11 +22,12 @@ where S::Error: fmt::Debug + Into + Send, { async fn run(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let concurrency_limit = NonZeroUsize::new(50); - input .filter_map(ensure_required_fields) - .request_builder(concurrency_limit, DatadogEventsRequestBuilder::new()) + .request_builder( + default_request_builder_concurrency_limit(), + DatadogEventsRequestBuilder::new(), + ) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 33b823ad3b92c..363fa55383d2b 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -1,30 +1,14 @@ -use std::{fmt::Debug, io, num::NonZeroUsize, sync::Arc}; +use std::{fmt::Debug, io, sync::Arc}; -use async_trait::async_trait; use bytes::Bytes; use codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig}; -use futures::stream::{BoxStream, StreamExt}; use lookup::event_path; use snafu::Snafu; -use tower::Service; -use vector_common::request_metadata::{GroupedCountByteSize, RequestMetadata}; -use vector_core::{ - event::{Event, EventFinalizers, Finalizable, Value}, - partition::Partitioner, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest}; -use crate::{ - codecs::{Encoder, Transformer}, - internal_events::SinkRequestBuildError, - sinks::util::{ - encoding::{write_all, Encoder as _}, - metadata::RequestMetadataBuilder, - request_builder::EncodeResult, - Compression, Compressor, RequestBuilder, SinkBuilderExt, - }, +use crate::sinks::{ + prelude::*, + util::{encoding::Encoder as _, Compressor}, }; #[derive(Default)] struct EventPartitioner; @@ -278,11 +262,10 @@ where let partitioner = EventPartitioner; - let builder_limit = NonZeroUsize::new(64); let input = input.batched_partitioned(partitioner, self.batch_settings); input .request_builder( - builder_limit, + default_request_builder_concurrency_limit(), LogRequestBuilder { default_api_key, encoding: self.encoding, diff --git a/src/sinks/elasticsearch/sink.rs b/src/sinks/elasticsearch/sink.rs index 7800f4d975a91..a3168d5b44809 100644 --- a/src/sinks/elasticsearch/sink.rs +++ b/src/sinks/elasticsearch/sink.rs @@ -1,22 +1,15 @@ -use std::{fmt, num::NonZeroUsize}; +use std::fmt; -use async_trait::async_trait; -use futures::{future, stream::BoxStream, StreamExt}; use lookup::lookup_v2::ConfigValuePath; -use tower::Service; -use vector_core::stream::{BatcherSettings, DriverResponse}; use vrl::path::PathPrefix; use crate::{ - codecs::Transformer, - event::{Event, LogEvent, Value}, - internal_events::SinkRequestBuildError, sinks::{ elasticsearch::{ encoder::ProcessedEvent, request_builder::ElasticsearchRequestBuilder, service::ElasticsearchRequest, BulkAction, ElasticsearchCommonMode, }, - util::{SinkBuilderExt, StreamSink}, + prelude::*, }, transforms::metric_to_log::MetricToLog, }; @@ -67,8 +60,6 @@ where S::Error: fmt::Debug + Into + Send, { pub async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let request_builder_concurrency_limit = NonZeroUsize::new(50); - let mode = self.mode; let id_key_field = self.id_key_field.as_ref(); let transformer = self.transformer.clone(); @@ -91,7 +82,10 @@ where future::ready(process_log(log, &mode, id_key_field, &transformer)) }) .batched(self.batch_settings.into_byte_size_config()) - .request_builder(request_builder_concurrency_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/gcp/stackdriver/logs/sink.rs b/src/sinks/gcp/stackdriver/logs/sink.rs index c15ed0f342ce6..39870bca24432 100644 --- a/src/sinks/gcp/stackdriver/logs/sink.rs +++ b/src/sinks/gcp/stackdriver/logs/sink.rs @@ -41,7 +41,10 @@ where .into_item_size_config(HttpJsonBatchSizer), ) // Build requests with no concurrency limit. - .request_builder(None, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) // Filter out any errors that occurred in the request building. .filter_map(|request| async move { match request { diff --git a/src/sinks/gcs_common/sink.rs b/src/sinks/gcs_common/sink.rs index 2f3991e5182d2..830dbd4ca6301 100644 --- a/src/sinks/gcs_common/sink.rs +++ b/src/sinks/gcs_common/sink.rs @@ -1,21 +1,6 @@ -use std::{fmt, num::NonZeroUsize}; +use std::fmt; -use async_trait::async_trait; -use futures::stream::BoxStream; -use futures_util::StreamExt; -use tower::Service; -use vector_common::request_metadata::MetaDescriptive; -use vector_core::{ - event::Finalizable, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; - -use crate::{ - event::Event, - internal_events::SinkRequestBuildError, - sinks::util::{partitioner::KeyPartitioner, RequestBuilder, SinkBuilderExt}, -}; +use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner}; pub struct GcsSink { service: Svc, @@ -57,7 +42,6 @@ where let partitioner = self.partitioner; let settings = self.batcher_settings; - let builder_limit = NonZeroUsize::new(64); let request_builder = self.request_builder; input @@ -67,7 +51,7 @@ where // thus no further `EventsDropped` event needs emitting at this stage. key.map(move |k| (k, batch)) }) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/honeycomb/sink.rs b/src/sinks/honeycomb/sink.rs index 9575577b91e11..e028f68982263 100644 --- a/src/sinks/honeycomb/sink.rs +++ b/src/sinks/honeycomb/sink.rs @@ -40,8 +40,8 @@ where self.batch_settings .into_item_size_config(HttpJsonBatchSizer), ) - // Build requests with no concurrency limit. - .request_builder(None, self.request_builder) + // Build requests with default concurrency limit. + .request_builder(default_request_builder_concurrency_limit(), self.request_builder) // Filter out any errors that occurred in the request building. .filter_map(|request| async move { match request { diff --git a/src/sinks/http/sink.rs b/src/sinks/http/sink.rs index ee997cd82ad33..e75fc61f53668 100644 --- a/src/sinks/http/sink.rs +++ b/src/sinks/http/sink.rs @@ -36,8 +36,8 @@ where .batched(self.batch_settings.into_item_size_config(HttpBatchSizer { encoder: self.request_builder.encoder.encoder.clone(), })) - // Build requests with no concurrency limit. - .request_builder(None, self.request_builder) + // Build requests with default concurrency limit. + .request_builder(default_request_builder_concurrency_limit(), self.request_builder) // Filter out any errors that occurred in the request building. .filter_map(|request| async move { match request { diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index 8a981a987c79a..411376383833a 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -1,5 +1,3 @@ -use std::num::NonZeroUsize; - use rdkafka::{ consumer::{BaseConsumer, Consumer}, error::KafkaError, @@ -68,7 +66,6 @@ impl KafkaSink { // 64 should be plenty concurrency here, as a rdkafka send operation does not block until its underlying // buffer is full. let service = ConcurrencyLimit::new(self.service.clone(), 64); - let builder_limit = NonZeroUsize::new(64); let request_builder = KafkaRequestBuilder { key_field: self.key_field, @@ -93,7 +90,7 @@ impl KafkaSink { .map(|topic| (topic, event)), ) }) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async { match request { Err(error) => { diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index 0d9cb9fbb43e1..89ae9857ee746 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -446,7 +446,7 @@ impl LokiSink { // out_of_order_action's that require a complete ordering are limited to building 1 request // at a time let request_builder_concurrency = match self.out_of_order_action { - OutOfOrderAction::Accept => NonZeroUsize::new(50).expect("static"), + OutOfOrderAction::Accept => default_request_builder_concurrency_limit(), OutOfOrderAction::Drop | OutOfOrderAction::RewriteTimestamp => { NonZeroUsize::new(1).expect("static") } @@ -479,7 +479,7 @@ impl LokiSink { None } }) - .request_builder(Some(request_builder_concurrency), self.request_builder) + .request_builder(request_builder_concurrency, self.request_builder) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/nats/sink.rs b/src/sinks/nats/sink.rs index 97fc747dc2f89..f6bea42919e9d 100644 --- a/src/sinks/nats/sink.rs +++ b/src/sinks/nats/sink.rs @@ -79,7 +79,7 @@ impl NatsSink { input .filter_map(|event| std::future::ready(self.make_nats_event(event))) - .request_builder(None, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(e) => { diff --git a/src/sinks/new_relic/sink.rs b/src/sinks/new_relic/sink.rs index 384a1b7a54e52..83f0c237b786e 100644 --- a/src/sinks/new_relic/sink.rs +++ b/src/sinks/new_relic/sink.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, num::NonZeroUsize, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; use bytes::Bytes; @@ -115,7 +115,6 @@ where S::Error: Debug + Into + Send, { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let builder_limit = NonZeroUsize::new(64); let request_builder = NewRelicRequestBuilder { encoder: self.encoder, compression: self.compression, @@ -125,7 +124,7 @@ where input .batched(self.batcher_settings.into_byte_size_config()) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map( |request: Result| async move { match request { diff --git a/src/sinks/opendal_common.rs b/src/sinks/opendal_common.rs index f8e5877b8e1ed..3ff05f214f1c7 100644 --- a/src/sinks/opendal_common.rs +++ b/src/sinks/opendal_common.rs @@ -8,38 +8,15 @@ //! - Error handling //! - Limitation -use std::{fmt, num::NonZeroUsize, task::Poll}; +use std::{fmt, task::Poll}; use bytes::Bytes; use codecs::encoding::Framer; -use futures::{stream::BoxStream, StreamExt}; use opendal::Operator; use snafu::Snafu; -use tower::Service; use tracing::Instrument; -use vector_common::{ - finalization::{EventStatus, Finalizable}, - json_size::JsonSize, - request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}, -}; -use vector_core::{ - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, - EstimatedJsonEncodedSizeOf, -}; - -use crate::{ - codecs::{Encoder, Transformer}, - event::{Event, EventFinalizers}, - internal_events::SinkRequestBuildError, - sinks::{ - util::{ - metadata::RequestMetadataBuilder, partitioner::KeyPartitioner, - request_builder::EncodeResult, Compression, RequestBuilder, SinkBuilderExt, - }, - BoxFuture, - }, -}; + +use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner}; /// OpenDalSink provides generic a service upon OpenDAL. /// @@ -98,7 +75,6 @@ where let partitioner = self.partitioner; let settings = self.batcher_settings; - let builder_limit = NonZeroUsize::new(64); let request_builder = self.request_builder; input @@ -109,7 +85,7 @@ where // that occurs. key.map(move |k| (k, batch)) }) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/prelude.rs b/src/sinks/prelude.rs index ecc62097aaef2..d118545537604 100644 --- a/src/sinks/prelude.rs +++ b/src/sinks/prelude.rs @@ -16,7 +16,7 @@ pub use crate::{ builder::SinkBuilderExt, encoding::{self, write_all}, metadata::RequestMetadataBuilder, - request_builder::EncodeResult, + request_builder::{default_request_builder_concurrency_limit, EncodeResult}, retries::{RetryAction, RetryLogic}, service::{ServiceBuilderExt, Svc}, BatchConfig, Compression, Concurrency, NoDefaultsBatchSettings, RequestBuilder, diff --git a/src/sinks/pulsar/sink.rs b/src/sinks/pulsar/sink.rs index 8644aa561dc58..7bd26ac24f5a8 100644 --- a/src/sinks/pulsar/sink.rs +++ b/src/sinks/pulsar/sink.rs @@ -112,7 +112,7 @@ impl PulsarSink { event, )) }) - .request_builder(None, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { request .map_err(|e| error!("Failed to build Pulsar request: {:?}.", e)) diff --git a/src/sinks/s3_common/sink.rs b/src/sinks/s3_common/sink.rs index 08e33674a02ad..c26f6f93e8a0f 100644 --- a/src/sinks/s3_common/sink.rs +++ b/src/sinks/s3_common/sink.rs @@ -1,21 +1,6 @@ -use std::{fmt, num::NonZeroUsize}; +use std::fmt; -use async_trait::async_trait; -use futures::stream::BoxStream; -use futures_util::StreamExt; -use tower::Service; -use vector_common::request_metadata::MetaDescriptive; -use vector_core::{ - event::Finalizable, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, -}; - -use crate::internal_events::SinkRequestBuildError; -use crate::{ - event::Event, - sinks::util::{RequestBuilder, SinkBuilderExt}, -}; +use crate::sinks::prelude::*; use super::partitioner::{S3KeyPartitioner, S3PartitionKey}; @@ -56,13 +41,12 @@ where let partitioner = self.partitioner; let settings = self.batcher_settings; - let builder_limit = NonZeroUsize::new(64); let request_builder = self.request_builder; input .batched_partitioned(partitioner, settings) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) - .request_builder(builder_limit, request_builder) + .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { match request { Err(error) => { diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index 2533add3438a2..555a3d9b95076 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -1,32 +1,19 @@ -use std::{fmt, num::NonZeroUsize, sync::Arc}; +use std::{fmt, sync::Arc}; -use async_trait::async_trait; -use futures_util::{stream::BoxStream, StreamExt}; use serde::Serialize; -use tower::Service; -use vector_buffers::EventCount; -use vector_core::{ - event::{Event, LogEvent, Value}, - partition::Partitioner, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, - ByteSizeOf, -}; use super::request_builder::HecLogsRequestBuilder; use crate::{ - config::SinkContext, internal_events::SplunkEventTimestampInvalidType, internal_events::SplunkEventTimestampMissing, - internal_events::TemplateRenderingError, sinks::{ + prelude::*, splunk_hec::common::{ render_template_string, request::HecRequest, EndpointTarget, INDEX_FIELD, SOURCETYPE_FIELD, SOURCE_FIELD, }, - util::{processed_event::ProcessedEvent, SinkBuilderExt}, + util::processed_event::ProcessedEvent, }, - template::Template, }; use lookup::{event_path, OwnedValuePath, PathPrefix}; @@ -64,8 +51,6 @@ where S::Error: fmt::Debug + Into + Send, { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let builder_limit = NonZeroUsize::new(64); - let data = HecLogData { sourcetype: self.sourcetype.as_ref(), source: self.source.as_ref(), @@ -94,7 +79,10 @@ where }, self.batch_settings, ) - .request_builder(builder_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|request| async move { match request { Err(e) => { diff --git a/src/sinks/splunk_hec/metrics/sink.rs b/src/sinks/splunk_hec/metrics/sink.rs index f41e397a7acac..ef42d21b60370 100644 --- a/src/sinks/splunk_hec/metrics/sink.rs +++ b/src/sinks/splunk_hec/metrics/sink.rs @@ -1,28 +1,17 @@ -use std::{fmt, num::NonZeroUsize, sync::Arc}; +use std::{fmt, sync::Arc}; -use async_trait::async_trait; -use futures_util::{future, stream::BoxStream, StreamExt}; use serde::Serialize; -use tower::Service; -use vector_buffers::EventCount; -use vector_core::{ - event::{Event, Metric, MetricValue}, - partition::Partitioner, - sink::StreamSink, - stream::{BatcherSettings, DriverResponse}, - ByteSizeOf, -}; +use vector_core::event::{Metric, MetricValue}; use vrl::path::OwnedValuePath; use super::request_builder::HecMetricsRequestBuilder; use crate::{ - config::SinkContext, internal_events::SplunkInvalidMetricReceivedError, sinks::{ + prelude::*, splunk_hec::common::{render_template_string, request::HecRequest}, - util::{encode_namespace, processed_event::ProcessedEvent, SinkBuilderExt}, + util::{encode_namespace, processed_event::ProcessedEvent}, }, - template::Template, }; pub struct HecMetricsSink { @@ -51,7 +40,6 @@ where let host_key = self.host_key.as_ref(); let default_namespace = self.default_namespace.as_deref(); - let builder_limit = NonZeroUsize::new(64); input .map(|event| (event.size_of(), event.into_metric())) .filter_map(move |(event_byte_size, metric)| { @@ -66,7 +54,10 @@ where )) }) .batched_partitioned(EventPartitioner, self.batch_settings) - .request_builder(builder_limit, self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) .filter_map(|request| async move { match request { Err(e) => { diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index 0d7af1635a4b7..c51bf405dc4d1 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -94,16 +94,15 @@ pub trait SinkBuilderExt: Stream { /// Constructs a [`Stream`] which transforms the input into a request suitable for sending to /// downstream services. /// - /// Each input is transformed concurrently, up to the given limit. A limit of `None` is - /// self-describing, as it imposes no concurrency limit, and `Some(n)` limits this stage to `n` - /// concurrent operations at any given time. + /// Each input is transformed concurrently, up to the given limit. A limit of `n` limits + /// this stage to `n` concurrent operations at any given time. /// /// Encoding and compression are handled internally, deferring to the builder at the necessary /// checkpoints for adjusting the event before encoding/compression, as well as generating the /// correct request object with the result of encoding/compressing the events. fn request_builder( self, - limit: Option, + limit: NonZeroUsize, builder: B, ) -> ConcurrentMap> where @@ -115,7 +114,7 @@ pub trait SinkBuilderExt: Stream { { let builder = Arc::new(builder); - self.concurrent_map(limit, move |input| { + self.concurrent_map(Some(limit), move |input| { let builder = Arc::clone(&builder); Box::pin(async move { diff --git a/src/sinks/util/request_builder.rs b/src/sinks/util/request_builder.rs index 86501f3a96a04..9756d871d2246 100644 --- a/src/sinks/util/request_builder.rs +++ b/src/sinks/util/request_builder.rs @@ -1,10 +1,18 @@ -use std::io; +use std::{io, num::NonZeroUsize}; use bytes::Bytes; use vector_common::request_metadata::{GroupedCountByteSize, RequestMetadata}; use super::{encoding::Encoder, metadata::RequestMetadataBuilder, Compression, Compressor}; +/// Default concurrency limit for a request builder +const DEFAULT_REQUEST_BUILDER_CONCURRENCY_LIMIT: Option = NonZeroUsize::new(64); + +pub fn default_request_builder_concurrency_limit() -> NonZeroUsize { + DEFAULT_REQUEST_BUILDER_CONCURRENCY_LIMIT + .expect("request builder concurrency limit should be non-zero constant") +} + pub struct EncodeResult

{ pub payload: P, pub uncompressed_byte_size: usize, From 0fb40170f312ab9513733160fb4888c718cbbec5 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Thu, 21 Sep 2023 15:58:12 -0400 Subject: [PATCH 04/10] fmt --- src/sinks/honeycomb/sink.rs | 5 ++++- src/sinks/http/sink.rs | 5 ++++- src/sinks/new_relic/sink.rs | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/sinks/honeycomb/sink.rs b/src/sinks/honeycomb/sink.rs index e028f68982263..235dcd6eb4893 100644 --- a/src/sinks/honeycomb/sink.rs +++ b/src/sinks/honeycomb/sink.rs @@ -41,7 +41,10 @@ where .into_item_size_config(HttpJsonBatchSizer), ) // Build requests with default concurrency limit. - .request_builder(default_request_builder_concurrency_limit(), self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) // Filter out any errors that occurred in the request building. .filter_map(|request| async move { match request { diff --git a/src/sinks/http/sink.rs b/src/sinks/http/sink.rs index e75fc61f53668..8427201b31662 100644 --- a/src/sinks/http/sink.rs +++ b/src/sinks/http/sink.rs @@ -37,7 +37,10 @@ where encoder: self.request_builder.encoder.encoder.clone(), })) // Build requests with default concurrency limit. - .request_builder(default_request_builder_concurrency_limit(), self.request_builder) + .request_builder( + default_request_builder_concurrency_limit(), + self.request_builder, + ) // Filter out any errors that occurred in the request building. .filter_map(|request| async move { match request { diff --git a/src/sinks/new_relic/sink.rs b/src/sinks/new_relic/sink.rs index 83f0c237b786e..fd93c90bbd61b 100644 --- a/src/sinks/new_relic/sink.rs +++ b/src/sinks/new_relic/sink.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; use bytes::Bytes; From 7ae0fca4cd0cc48fb003678612aade124dd2d09b Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Fri, 22 Sep 2023 14:14:04 -0400 Subject: [PATCH 05/10] set default to WORKER_THREADS and allow env var override --- src/app.rs | 10 +++++----- src/sinks/util/request_builder.rs | 16 +++++++++++----- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/app.rs b/src/app.rs index e8ac0a6061233..a1a6500b281da 100644 --- a/src/app.rs +++ b/src/app.rs @@ -468,14 +468,14 @@ pub fn build_runtime(threads: Option, thread_name: &str) -> Result = NonZeroUsize::new(64); - pub fn default_request_builder_concurrency_limit() -> NonZeroUsize { - DEFAULT_REQUEST_BUILDER_CONCURRENCY_LIMIT - .expect("request builder concurrency limit should be non-zero constant") + if let Some(limit) = std::env::var("VECTOR_EXPERIMENTAL_REQUEST_BUILDER_CONCURRENCY") + .map(|value| value.parse::().ok()) + .ok() + .flatten() + { + return limit; + } + + crate::app::WORKER_THREADS + .get() + .unwrap_or_else(|| NonZeroUsize::new(8).expect("static")) } pub struct EncodeResult

{ From 4876f76eacd5020067b701ba0241cfc006b0371f Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Mon, 25 Sep 2023 11:18:00 -0400 Subject: [PATCH 06/10] fix bad merge --- src/sinks/kafka/sink.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index 84e41939e83c4..59701ad0542f7 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -72,7 +72,6 @@ impl KafkaSink { key_field: self.key_field, headers_key: self.headers_key, encoder: (self.transformer, self.encoder), - encoder: (self.transformer, self.encoder), }; input From fd32323707b572f349ad3cdc458fe1a99996de0b Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Mon, 25 Sep 2023 11:27:40 -0400 Subject: [PATCH 07/10] fix bad merge --- src/sinks/kafka/sink.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index 59701ad0542f7..141c32f7cb3b7 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -13,7 +13,6 @@ use super::config::{KafkaRole, KafkaSinkConfig}; use crate::{ kafka::KafkaStatisticsContext, sinks::kafka::{request_builder::KafkaRequestBuilder, service::KafkaService}, - sinks::kafka::{request_builder::KafkaRequestBuilder, service::KafkaService}, sinks::prelude::*, }; From 8e124689ab2a5bdf760397b392c3c3427da2c953 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Mon, 25 Sep 2023 12:58:51 -0400 Subject: [PATCH 08/10] revert cargo.lock --- Cargo.lock | 70 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5ed2466b9af8..a8b7964e585a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -674,7 +674,7 @@ dependencies = [ "time", "tokio", "tokio-retry", - "tokio-rustls 0.24.0", + "tokio-rustls 0.24.1", "tracing 0.1.37", "url", ] @@ -1638,7 +1638,7 @@ dependencies = [ "hyperlocal", "log", "pin-project-lite", - "rustls 0.21.6", + "rustls 0.21.7", "rustls-native-certs", "rustls-pemfile", "rustls-webpki", @@ -4265,10 +4265,10 @@ dependencies = [ "http", "hyper", "log", - "rustls 0.21.6", + "rustls 0.21.7", "rustls-native-certs", "tokio", - "tokio-rustls 0.24.0", + "tokio-rustls 0.24.1", ] [[package]] @@ -5126,10 +5126,11 @@ dependencies = [ [[package]] name = "md-5" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" dependencies = [ + "cfg-if", "digest", ] @@ -5897,8 +5898,8 @@ dependencies = [ "ordered-float 3.9.1", "prost 0.12.1", "prost-build 0.12.1", - "tonic 0.10.0", - "tonic-build 0.10.0", + "tonic 0.10.1", + "tonic-build 0.10.1", "vector-core", "vector-lookup", "vrl", @@ -7185,14 +7186,14 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls 0.21.6", + "rustls 0.21.7", "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "tokio", "tokio-native-tls", - "tokio-rustls 0.24.0", + "tokio-rustls 0.24.1", "tokio-util", "tower-service", "url", @@ -7458,9 +7459,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.6" +version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" +checksum = "cd8d6c9f025a446bc4d18ad9632e69aec8f287aa84499ee335599fabd20c3fd8" dependencies = [ "log", "ring", @@ -8073,9 +8074,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62bb4feee49fdd9f707ef802e22365a35de4b7b299de4763d44bfea899442ff9" +checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" dependencies = [ "serde", ] @@ -8551,9 +8552,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.29" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe" +checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" dependencies = [ "deranged", "itoa", @@ -8566,15 +8567,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.2" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" +checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" [[package]] name = "time-macros" -version = "0.2.15" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ad70d68dba9e1f8aceda7aa6711965dfec1cac869f311a51bd08b3a2ccbce20" +checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" dependencies = [ "time-core", ] @@ -8737,11 +8738,11 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.24.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0d409377ff5b1e3ca6437aa86c1eb7d40c134bfec254e44c830defa92669db5" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.6", + "rustls 0.21.7", "tokio", ] @@ -8790,7 +8791,7 @@ checksum = "2b2dbec703c26b00d74844519606ef15d09a7d6857860f84ad223dec002ddea2" dependencies = [ "futures-util", "log", - "rustls 0.21.6", + "rustls 0.21.7", "tokio", "tungstenite 0.20.0", ] @@ -8876,7 +8877,7 @@ dependencies = [ "prost 0.11.9", "rustls-pemfile", "tokio", - "tokio-rustls 0.24.0", + "tokio-rustls 0.24.1", "tokio-stream", "tower", "tower-layer", @@ -8886,9 +8887,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5469afaf78a11265c343a88969045c1568aa8ecc6c787dbf756e92e70f199861" +checksum = "14c00bc15e49625f3d2f20b17082601e5e17cf27ead69e805174026c194b6664" dependencies = [ "async-stream", "async-trait", @@ -8904,10 +8905,11 @@ dependencies = [ "percent-encoding", "pin-project", "prost 0.12.1", + "rustls 0.21.7", "rustls-native-certs", "rustls-pemfile", "tokio", - "tokio-rustls 0.24.0", + "tokio-rustls 0.24.1", "tokio-stream", "tower", "tower-layer", @@ -8930,9 +8932,9 @@ dependencies = [ [[package]] name = "tonic-build" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b477abbe1d18c0b08f56cd01d1bc288668c5b5cfd19b2ae1886bbf599c546f1" +checksum = "c9d37bb15da06ae9bb945963066baca6561b505af93a52e949a85d28558459a2" dependencies = [ "prettyplease 0.2.12", "proc-macro2 1.0.67", @@ -9734,8 +9736,8 @@ dependencies = [ "tokio-tungstenite 0.20.0", "tokio-util", "toml 0.8.0", - "tonic 0.10.0", - "tonic-build 0.10.0", + "tonic 0.10.1", + "tonic-build 0.10.1", "tower", "tower-http", "tower-test", @@ -9983,7 +9985,7 @@ dependencies = [ "tokio-test", "tokio-util", "toml 0.8.0", - "tonic 0.10.0", + "tonic 0.10.1", "tower", "tracing 0.1.37", "tracing-core 0.1.30", @@ -10797,4 +10799,4 @@ dependencies = [ "cc", "libc", "pkg-config", -] +] \ No newline at end of file From 09930ecf2af2fe6f7c89463c3011a289ca7d4bd3 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Mon, 25 Sep 2023 13:01:42 -0400 Subject: [PATCH 09/10] revert cargo.lock --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index a8b7964e585a4..a4a9e90d3c5a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10799,4 +10799,4 @@ dependencies = [ "cc", "libc", "pkg-config", -] \ No newline at end of file +] From 76dc42a8f82428568b4bdf3179e09758bc96e680 Mon Sep 17 00:00:00 2001 From: Doug Smith Date: Mon, 25 Sep 2023 15:20:49 -0400 Subject: [PATCH 10/10] fix flakey test --- src/sinks/amqp/integration_tests.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/sinks/amqp/integration_tests.rs b/src/sinks/amqp/integration_tests.rs index 08db9c6e46b27..194a2edae6947 100644 --- a/src/sinks/amqp/integration_tests.rs +++ b/src/sinks/amqp/integration_tests.rs @@ -10,7 +10,7 @@ use crate::{ SourceSender, }; use futures::StreamExt; -use std::{sync::Arc, time::Duration}; +use std::{collections::HashSet, sync::Arc, time::Duration}; use vector_core::config::LogNamespace; pub fn make_config() -> AmqpSinkConfig { @@ -129,6 +129,9 @@ async fn amqp_happy_path() { } assert_eq!(out.len(), input.len()); + + let input: HashSet = HashSet::from_iter(input); + let out: HashSet = HashSet::from_iter(out); assert_eq!(out, input); }