diff --git a/lib/vector-core/src/stream/batcher/mod.rs b/lib/vector-core/src/stream/batcher/mod.rs index a7153a5a8665d..bfb20e95e4160 100644 --- a/lib/vector-core/src/stream/batcher/mod.rs +++ b/lib/vector-core/src/stream/batcher/mod.rs @@ -129,10 +129,7 @@ mod test { NonZeroUsize::new(10000).unwrap(), NonZeroUsize::new(2).unwrap(), ); - let batcher = Batcher::new( - stream, - settings.into_item_size_config(|x: &u32| *x as usize), - ); + let batcher = Batcher::new(stream, settings.as_item_size_config(|x: &u32| *x as usize)); let batches: Vec<_> = batcher.collect().await; assert_eq!(batches, vec![vec![1, 2], vec![3],]); } @@ -146,7 +143,7 @@ mod test { NonZeroUsize::new(5).unwrap(), NonZeroUsize::new(100).unwrap(), ) - .into_item_size_config(|x: &u32| *x as usize), + .as_item_size_config(|x: &u32| *x as usize), ); let batches: Vec<_> = batcher.collect().await; assert_eq!( @@ -176,7 +173,7 @@ mod test { NonZeroUsize::new(5).unwrap(), NonZeroUsize::new(100).unwrap(), ) - .into_item_size_config(|x: &u32| *x as usize), + .as_item_size_config(|x: &u32| *x as usize), ); tokio::pin!(batcher); diff --git a/lib/vector-core/src/stream/partitioned_batcher.rs b/lib/vector-core/src/stream/partitioned_batcher.rs index aa2f7e6b58bcd..f7d0d55b9556f 100644 --- a/lib/vector-core/src/stream/partitioned_batcher.rs +++ b/lib/vector-core/src/stream/partitioned_batcher.rs @@ -1,8 +1,6 @@ use std::{ - cmp, collections::HashMap, hash::{BuildHasherDefault, Hash}, - mem, num::NonZeroUsize, pin::Pin, task::{ready, Context, Poll}, @@ -25,6 +23,8 @@ use crate::{ ByteSizeOf, }; +use super::batcher::BatchConfig; + /// A `KeyedTimer` based on `DelayQueue`. pub struct ExpirationQueue { /// The timeout to give each new key entry @@ -86,6 +86,12 @@ where } } + fn remove(&mut self, item_key: &K) { + if let Some(expiration_key) = self.expiration_map.remove(item_key) { + self.expirations.remove(&expiration_key); + } + } + fn poll_expired(&mut self, cx: &mut Context) -> Poll> { match ready!(self.expirations.poll_expired(cx)) { // No expirations yet. @@ -100,114 +106,6 @@ where } } -/// A batch for use by `Batcher` -/// -/// This structure is a private implementation detail that simplifies the -/// implementation of `Batcher`. It is the actual store of items that come -/// through the stream manipulated by `Batcher` plus limit information to signal -/// when the `Batch` is full. -struct Batch { - /// The total number of `I` bytes stored, does not any overhead in this - /// structure. - allocated_bytes: usize, - /// The maximum number of elements allowed in this structure. - element_limit: usize, - /// The maximum number of allocated bytes (not including overhead) allowed - /// in this structure. - allocation_limit: usize, - /// The store of `I` elements. - elements: Vec, -} - -impl ByteSizeOf for Batch { - fn allocated_bytes(&self) -> usize { - self.allocated_bytes - } -} - -impl Batch -where - I: ByteSizeOf, -{ - /// Create a new Batch instance - /// - /// Creates a new batch instance with specific element and allocation limits. The element limit - /// is a maximum cap on the number of `I` instances. The allocation limit is a soft-max on the - /// number of allocated bytes stored in this batch, not taking into account overhead from this - /// structure itself. - /// - /// If `allocation_limit` is smaller than the size of `I` as reported by `std::mem::size_of`, - /// then the allocation limit will be raised such that the batch can hold a single instance of - /// `I`. Likewise, `element_limit` will be raised such that it is always at least 1, ensuring - /// that a new batch can be pushed into. - fn new(element_limit: usize, allocation_limit: usize) -> Self { - // SAFETY: `element_limit` is always non-zero because `BatcherSettings` can only be - // constructed with `NonZeroUsize` versions of allocation limit/item limit. `Batch` is also - // only constructable via `Batcher`. - - // TODO: This may need to be reworked, because it's subtly wrong as-is. - // ByteSizeOf::size_of() always returns the size of the type itself, plus any "allocated - // bytes". Thus, there are times when an item will be bigger than simply the size of the - // type itself (aka mem::size_of::()) and thus than type of item would never fit in a - // batch where the `allocation_limit` is at or lower than the size of that item. - // - // We're counteracting this here by ensuring that the element limit is always at least 1. - let allocation_limit = cmp::max(allocation_limit, mem::size_of::()); - Self { - allocated_bytes: 0, - element_limit, - allocation_limit, - elements: Vec::with_capacity(128), - } - } - - /// Unconditionally insert an element into the batch - /// - /// This function is similar to `push` except that the caller does not need - /// to call `has_space` prior to calling this and it will never - /// panic. Intended to be used only when insertion must not fail. - fn with(mut self, value: I) -> Self { - self.allocated_bytes += value.size_of(); - self.elements.push(value); - self - } - - /// Decompose the batch - /// - /// Called by the user when they want to get at the internal store of - /// items. Returns a tuple, the first element being the allocated size of - /// stored items and the second the store of items. - fn into_inner(self) -> Vec { - self.elements - } - - /// Whether the batch has space for a new item - /// - /// This function returns true of there is space both in terms of item count - /// and byte count for the given item, false otherwise. - fn has_space(&self, value: &I) -> bool { - let too_many_elements = self.elements.len() + 1 > self.element_limit; - let too_many_bytes = self.allocated_bytes + value.size_of() > self.allocation_limit; - !(too_many_elements || too_many_bytes) - } - - /// Push an element into the batch - /// - /// This function pushes an element into the batch. Callers must be sure to - /// call `has_space` prior to calling this function and receive a positive - /// result. - /// - /// # Panics - /// - /// This function will panic if there is not sufficient space in the batch - /// for a new element to be inserted. - fn push(&mut self, value: I) { - assert!(self.has_space(&value)); - self.allocated_bytes += value.size_of(); - self.elements.push(value); - } -} - /// Controls the behavior of the batcher in terms of batch size and flush interval. /// /// This is a temporary solution for pushing in a fixed settings structure so we don't have to worry @@ -237,15 +135,15 @@ impl BatcherSettings { /// A batcher config using the `ByteSizeOf` trait to determine batch sizes. /// The output is a `Vec`. - pub fn into_byte_size_config( - self, + pub fn as_byte_size_config( + &self, ) -> BatchConfigParts, Vec> { - self.into_item_size_config(ByteSizeOfItemSize) + self.as_item_size_config(ByteSizeOfItemSize) } /// A batcher config using the `ItemBatchSize` trait to determine batch sizes. /// The output is a `Vec`. - pub fn into_item_size_config(self, item_size: I) -> BatchConfigParts, Vec> + pub fn as_item_size_config(&self, item_size: I) -> BatchConfigParts, Vec> where I: ItemBatchSize, { @@ -287,18 +185,16 @@ impl BatcherSettings { } #[pin_project] -pub struct PartitionedBatcher +pub struct PartitionedBatcher where Prt: Partitioner, { - /// The total number of bytes a single batch in this struct is allowed to - /// hold. - batch_allocation_limit: usize, - /// The maximum number of items that are allowed per-batch - batch_item_limit: usize, + /// A closure that retrieves a new [`BatchConfig`] when needed to batch a + /// new partition. + state: F, /// The store of live batches. Note that the key here is an option type, /// on account of the interface of `Prt`. - batches: HashMap, BuildHasherDefault>, + batches: HashMap>, /// The store of 'closed' batches. When this is not empty it will be /// preferentially flushed prior to consuming any new items from the /// underlying stream. @@ -312,44 +208,41 @@ where stream: Fuse, } -impl PartitionedBatcher> +impl PartitionedBatcher, C, F> where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, Prt::Item: ByteSizeOf, + C: BatchConfig, + F: Fn() -> C + Send, { - pub fn new(stream: St, partitioner: Prt, settings: BatcherSettings) -> Self { + pub fn new(stream: St, partitioner: Prt, settings: F) -> Self { + let timeout = settings().timeout(); Self { - batch_allocation_limit: settings.size_limit, - batch_item_limit: settings.item_limit, + state: settings, batches: HashMap::default(), closed_batches: Vec::default(), - timer: ExpirationQueue::new(settings.timeout), + timer: ExpirationQueue::new(timeout), partitioner, stream: stream.fuse(), } } } -impl PartitionedBatcher +#[cfg(test)] +impl PartitionedBatcher where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, Prt::Item: ByteSizeOf, + C: BatchConfig, + F: Fn() -> C + Send, { - pub fn with_timer( - stream: St, - partitioner: Prt, - timer: KT, - batch_item_limit: NonZeroUsize, - batch_allocation_limit: Option, - ) -> Self { + pub fn with_timer(stream: St, partitioner: Prt, timer: KT, settings: F) -> Self { Self { - batch_allocation_limit: batch_allocation_limit - .map_or(usize::max_value(), NonZeroUsize::get), - batch_item_limit: batch_item_limit.get(), + state: settings, batches: HashMap::default(), closed_batches: Vec::default(), timer, @@ -359,13 +252,15 @@ where } } -impl Stream for PartitionedBatcher +impl Stream for PartitionedBatcher where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, Prt::Item: ByteSizeOf, KT: KeyedTimer, + C: BatchConfig>, + F: Fn() -> C + Send, { type Item = (Prt::Key, Vec); @@ -380,20 +275,20 @@ where return Poll::Ready(this.closed_batches.pop()); } match this.stream.as_mut().poll_next(cx) { - Poll::Pending => match this.timer.poll_expired(cx) { - // Unlike normal streams, `DelayQueue` can return `None` - // here but still be usable later if more entries are added. - Poll::Pending | Poll::Ready(None) => return Poll::Pending, - Poll::Ready(Some(item_key)) => { - let batch = this - .batches - .remove(&item_key) - .expect("batch should exist if it is set to expire"); - this.closed_batches.push((item_key, batch.into_inner())); - - continue; + Poll::Pending => { + match this.timer.poll_expired(cx) { + // Unlike normal streams, `DelayQueue` can return `None` + // here but still be usable later if more entries are added. + Poll::Pending | Poll::Ready(None) => return Poll::Pending, + Poll::Ready(Some(item_key)) => { + let mut batch = this + .batches + .remove(&item_key) + .expect("batch should exist if it is set to expire"); + this.closed_batches.push((item_key, batch.take_batch())); + } } - }, + } Poll::Ready(None) => { // Now that the underlying stream is closed, we need to // clear out our batches, including all expiration @@ -405,7 +300,7 @@ where this.closed_batches.extend( this.batches .drain() - .map(|(key, batch)| (key, batch.into_inner())), + .map(|(key, mut batch)| (key, batch.take_batch())), ); continue; } @@ -413,33 +308,42 @@ where } Poll::Ready(Some(item)) => { let item_key = this.partitioner.partition(&item); - let item_limit: usize = *this.batch_item_limit; - let alloc_limit: usize = *this.batch_allocation_limit; - - if let Some(batch) = this.batches.get_mut(&item_key) { - if batch.has_space(&item) { - // When there's space in the partition batch just - // push the item in and loop back around. - batch.push(item); - } else { - let new_batch = Batch::new(item_limit, alloc_limit).with(item); - let batch = mem::replace(batch, new_batch); - // The batch for this partition key was set to - // expire, but now it's overflowed and must be - // pushed out, so now we reset the batch timeout. - this.timer.insert(item_key.clone()); - - this.closed_batches.push((item_key, batch.into_inner())); - } + // Get the batch for this partition, or create a new one. + let batch = if let Some(batch) = this.batches.get_mut(&item_key) { + batch } else { - // We have no batch yet for this partition key, so - // create one and create the expiration entries as well. - // This allows the batch to expire before filling up, - // and vice versa. - let batch = Batch::new(item_limit, alloc_limit).with(item); + let batch = (this.state)(); this.batches.insert(item_key.clone(), batch); - this.timer.insert(item_key); + this.timer.insert(item_key.clone()); + this.batches + .get_mut(&item_key) + .expect("batch has just been inserted so should exist") + }; + + let (fits, metadata) = batch.item_fits_in_batch(&item); + if !fits { + // This batch is too full to accept a new item, so we move the contents of + // the batch into `closed_batches` to be push out of this stream on the + // next iteration. + this.closed_batches + .push((item_key.clone(), batch.take_batch())); + + // The batch for this partition key was set to + // expire, but now it's overflowed and must be + // pushed out, so now we reset the batch timeout. + this.timer.insert(item_key.clone()); + } + + // Insert the item into the batch. + batch.push(item, metadata); + if batch.is_batch_full() { + // If the insertion means the batch is now full, we clear out the batch and + // remove it from the list. + this.closed_batches + .push((item_key.clone(), batch.take_batch())); + this.batches.remove(&item_key); + this.timer.remove(&item_key); } } } @@ -464,7 +368,10 @@ mod test { use crate::{ partition::Partitioner, - stream::partitioned_batcher::{ExpirationQueue, PartitionedBatcher}, + stream::{ + partitioned_batcher::{ExpirationQueue, PartitionedBatcher}, + BatcherSettings, + }, time::KeyedTimer, }; @@ -498,6 +405,10 @@ mod test { self.valid_keys.insert(item_key); } + fn remove(&mut self, item_key: &u8) { + self.valid_keys.remove(item_key); + } + fn poll_expired(&mut self, _cx: &mut Context) -> Poll> { match self.responses.pop() { Some(Poll::Pending) => unreachable!(), @@ -571,11 +482,12 @@ mod test { // key. let mut stream = stream::iter(stream.into_iter()); let stream_size_hint = stream.size_hint(); - let item_limit = NonZeroUsize::new(item_limit as usize).unwrap(); let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); + let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); + let batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, timer, - item_limit, Some(allocation_limit)); + Box::new(move || batch_settings.as_byte_size_config())); let batcher_size_hint = batcher.size_hint(); assert_eq!(stream_size_hint, batcher_size_hint); @@ -597,9 +509,9 @@ mod test { let mut stream = stream::iter(stream.into_iter()); let item_limit = NonZeroUsize::new(item_limit as usize).unwrap(); let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); + let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, - timer, item_limit, - Some(allocation_limit)); + timer, Box::new(move || batch_settings.as_byte_size_config())); let mut batcher = Pin::new(&mut batcher); loop { @@ -668,9 +580,9 @@ mod test { let mut stream = stream::iter(stream.into_iter()); let item_limit = NonZeroUsize::new(item_limit as usize).unwrap(); let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); + let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, - timer, item_limit, - Some(allocation_limit)); + timer, Box::new(move || batch_settings.as_byte_size_config())); let mut batcher = Pin::new(&mut batcher); loop { @@ -683,6 +595,7 @@ mod test { let expected_partition = partitions .get_mut(&key) .expect("impossible situation"); + for item in actual_batch { assert_eq!(item, expected_partition.pop().unwrap()); } @@ -711,9 +624,9 @@ mod test { let mut stream = stream::iter(stream.into_iter()); let item_limit = NonZeroUsize::new(item_limit as usize).unwrap(); let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); + let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, - timer, item_limit, - Some(allocation_limit)); + timer, Box::new(move || batch_settings.as_byte_size_config())); let mut batcher = Pin::new(&mut batcher); let mut observed_items = 0; diff --git a/lib/vector-core/src/time.rs b/lib/vector-core/src/time.rs index 49481842cfcdd..e4e4e4975ebe4 100644 --- a/lib/vector-core/src/time.rs +++ b/lib/vector-core/src/time.rs @@ -21,6 +21,9 @@ pub trait KeyedTimer { /// If the given key already exists in the timer, the underlying subtimer is reset. fn insert(&mut self, item_key: K); + /// Removes a subtimer from the list. + fn remove(&mut self, item_key: &K); + /// Attempts to pull out the next expired subtimer in the queue. /// /// The key of the subtimer is returned if it has expired, otherwise, returns `None` if the diff --git a/src/sinks/appsignal/sink.rs b/src/sinks/appsignal/sink.rs index 1ad0e3e0c8779..e066b6dd94a30 100644 --- a/src/sinks/appsignal/sink.rs +++ b/src/sinks/appsignal/sink.rs @@ -34,7 +34,7 @@ where Some(event) }) }) - .batched(self.batch_settings.into_byte_size_config()) + .batched(self.batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), AppsignalRequestBuilder { diff --git a/src/sinks/aws_cloudwatch_logs/sink.rs b/src/sinks/aws_cloudwatch_logs/sink.rs index 3c320ad6236e7..5d71f8d0b605e 100644 --- a/src/sinks/aws_cloudwatch_logs/sink.rs +++ b/src/sinks/aws_cloudwatch_logs/sink.rs @@ -49,7 +49,9 @@ where let age_range = start..end; future::ready(age_range.contains(&req.timestamp)) }) - .batched_partitioned(CloudwatchPartitioner, batcher_settings) + .batched_partitioned(CloudwatchPartitioner, || { + batcher_settings.as_byte_size_config() + }) .map(|(key, events)| { let metadata = RequestMetadata::from_batch( events.iter().map(|req| req.get_metadata().clone()), diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index 61a1e539418aa..6dfb0be08d19e 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -42,6 +42,8 @@ where R: Record + Send + Sync + Unpin + Clone + 'static, { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let batch_settings = self.batch_settings; + input .filter_map(|event| { // Panic: This sink only accepts Logs, so this should never panic @@ -67,7 +69,7 @@ where KinesisPartitioner { _phantom: PhantomData, }, - self.batch_settings, + || batch_settings.as_byte_size_config(), ) .map(|(key, events)| { let metadata = RequestMetadata::from_batch( diff --git a/src/sinks/azure_common/sink.rs b/src/sinks/azure_common/sink.rs index d46910f1e4d98..4844474232cd0 100644 --- a/src/sinks/azure_common/sink.rs +++ b/src/sinks/azure_common/sink.rs @@ -42,7 +42,7 @@ where let request_builder = self.request_builder; input - .batched_partitioned(partitioner, settings) + .batched_partitioned(partitioner, || settings.as_byte_size_config()) .filter_map(|(key, batch)| async move { // We don't need to emit an error here if the event is dropped since this will occur if the template // couldn't be rendered during the partitioning. A `TemplateRenderingError` is already emitted when diff --git a/src/sinks/azure_monitor_logs/sink.rs b/src/sinks/azure_monitor_logs/sink.rs index ce06e04bb83b8..20db422f01420 100644 --- a/src/sinks/azure_monitor_logs/sink.rs +++ b/src/sinks/azure_monitor_logs/sink.rs @@ -39,7 +39,7 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input - .batched(self.batch_settings.into_byte_size_config()) + .batched(self.batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), AzureMonitorLogsRequestBuilder { diff --git a/src/sinks/clickhouse/sink.rs b/src/sinks/clickhouse/sink.rs index c951101c8ce21..be9c28a0e83b4 100644 --- a/src/sinks/clickhouse/sink.rs +++ b/src/sinks/clickhouse/sink.rs @@ -42,11 +42,12 @@ impl ClickhouseSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let batch_settings = self.batch_settings; + input - .batched_partitioned( - KeyPartitioner::new(self.database, self.table), - self.batch_settings, - ) + .batched_partitioned(KeyPartitioner::new(self.database, self.table), || { + batch_settings.as_byte_size_config() + }) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/databend/sink.rs b/src/sinks/databend/sink.rs index 87b3d1155cc9d..dcb625ce1ffe6 100644 --- a/src/sinks/databend/sink.rs +++ b/src/sinks/databend/sink.rs @@ -24,7 +24,7 @@ impl DatabendSink { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input - .batched(self.batch_settings.into_byte_size_config()) + .batched(self.batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), self.request_builder, diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 363fa55383d2b..7b1245edb6389 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -261,8 +261,9 @@ where let default_api_key = Arc::clone(&self.default_api_key); let partitioner = EventPartitioner; + let batch_settings = self.batch_settings; - let input = input.batched_partitioned(partitioner, self.batch_settings); + let input = input.batched_partitioned(partitioner, || batch_settings.as_byte_size_config()); input .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index d04ba43d70bed..0e5fc962b0386 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -85,6 +85,7 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let mut splitter: MetricSplitter = MetricSplitter::default(); + let batch_settings = self.batch_settings; input // Convert `Event` to `Metric` so we don't have to deal with constant conversions. @@ -99,7 +100,9 @@ where .normalized_with_default::() // We batch metrics by their endpoint: series endpoint for counters, gauge, and sets vs sketch endpoint for // distributions, aggregated histograms, and sketches. - .batched_partitioned(DatadogMetricsTypePartitioner, self.batch_settings) + .batched_partitioned(DatadogMetricsTypePartitioner, || { + batch_settings.as_byte_size_config() + }) // Aggregate counters with identical timestamps, otherwise identical counters (same // series and same timestamp, when rounded to whole seconds) will be dropped in a // last-write-wins situation when they hit the DD metrics intake. diff --git a/src/sinks/datadog/traces/sink.rs b/src/sinks/datadog/traces/sink.rs index 4cf502a7eebae..384d7ef635022 100644 --- a/src/sinks/datadog/traces/sink.rs +++ b/src/sinks/datadog/traces/sink.rs @@ -108,8 +108,10 @@ where } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let batch_settings = self.batch_settings; + input - .batched_partitioned(EventPartitioner, self.batch_settings) + .batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config()) .incremental_request_builder(self.request_builder) .flat_map(stream::iter) .filter_map(|request| async move { diff --git a/src/sinks/elasticsearch/sink.rs b/src/sinks/elasticsearch/sink.rs index a3168d5b44809..5d3c043e233ce 100644 --- a/src/sinks/elasticsearch/sink.rs +++ b/src/sinks/elasticsearch/sink.rs @@ -81,7 +81,7 @@ where .filter_map(move |log| { future::ready(process_log(log, &mode, id_key_field, &transformer)) }) - .batched(self.batch_settings.into_byte_size_config()) + .batched(self.batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), self.request_builder, diff --git a/src/sinks/gcp/stackdriver/logs/sink.rs b/src/sinks/gcp/stackdriver/logs/sink.rs index 39870bca24432..fbf3e2066d420 100644 --- a/src/sinks/gcp/stackdriver/logs/sink.rs +++ b/src/sinks/gcp/stackdriver/logs/sink.rs @@ -36,10 +36,7 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input // Batch the input stream with size calculation based on the estimated encoded json size - .batched( - self.batch_settings - .into_item_size_config(HttpJsonBatchSizer), - ) + .batched(self.batch_settings.as_item_size_config(HttpJsonBatchSizer)) // Build requests with no concurrency limit. .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/gcp/stackdriver/metrics/sink.rs b/src/sinks/gcp/stackdriver/metrics/sink.rs index 538a666bdd91d..91ba22a640824 100644 --- a/src/sinks/gcp/stackdriver/metrics/sink.rs +++ b/src/sinks/gcp/stackdriver/metrics/sink.rs @@ -66,7 +66,7 @@ where }) }) .normalized_with_default::() - .batched(self.batch_settings.into_byte_size_config()) + .batched(self.batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), self.request_builder, diff --git a/src/sinks/gcs_common/sink.rs b/src/sinks/gcs_common/sink.rs index 830dbd4ca6301..1f1971a15152e 100644 --- a/src/sinks/gcs_common/sink.rs +++ b/src/sinks/gcs_common/sink.rs @@ -45,7 +45,7 @@ where let request_builder = self.request_builder; input - .batched_partitioned(partitioner, settings) + .batched_partitioned(partitioner, || settings.as_byte_size_config()) .filter_map(|(key, batch)| async move { // A `TemplateRenderingError` will have been emitted by `KeyPartitioner` if the key here is `None`, // thus no further `EventsDropped` event needs emitting at this stage. diff --git a/src/sinks/greptimedb/sink.rs b/src/sinks/greptimedb/sink.rs index fc8e75140708d..b4b36a10bf5db 100644 --- a/src/sinks/greptimedb/sink.rs +++ b/src/sinks/greptimedb/sink.rs @@ -37,7 +37,7 @@ impl GreptimeDBSink { .normalized_with_default::() .batched( self.batch_settings - .into_item_size_config(GreptimeDBBatchSizer), + .as_item_size_config(GreptimeDBBatchSizer), ) .map(GreptimeDBRequest::from_metrics) .into_driver(self.service) diff --git a/src/sinks/honeycomb/sink.rs b/src/sinks/honeycomb/sink.rs index 235dcd6eb4893..fe2bf4cdfc598 100644 --- a/src/sinks/honeycomb/sink.rs +++ b/src/sinks/honeycomb/sink.rs @@ -36,10 +36,7 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input // Batch the input stream with size calculation based on the estimated encoded json size - .batched( - self.batch_settings - .into_item_size_config(HttpJsonBatchSizer), - ) + .batched(self.batch_settings.as_item_size_config(HttpJsonBatchSizer)) // Build requests with default concurrency limit. .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/http/sink.rs b/src/sinks/http/sink.rs index 8427201b31662..575e558e2d013 100644 --- a/src/sinks/http/sink.rs +++ b/src/sinks/http/sink.rs @@ -33,7 +33,7 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input // Batch the input stream with size calculation based on the configured codec - .batched(self.batch_settings.into_item_size_config(HttpBatchSizer { + .batched(self.batch_settings.as_item_size_config(HttpBatchSizer { encoder: self.request_builder.encoder.encoder.clone(), })) // Build requests with default concurrency limit. diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index 9d9046788d04b..1b0d45bb3092b 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -453,12 +453,13 @@ impl LokiSink { NonZeroUsize::new(1).expect("static") } }; + let batch_settings = self.batch_settings; input .map(|event| encoder.encode_event(event)) .filter_map(|event| async { event }) .map(|record| filter.filter_record(record)) - .batched_partitioned(RecordPartitioner, self.batch_settings) + .batched_partitioned(RecordPartitioner, || batch_settings.as_byte_size_config()) .filter_map(|(partition, batch)| async { if let Some(partition) = partition { let mut count: usize = 0; diff --git a/src/sinks/new_relic/sink.rs b/src/sinks/new_relic/sink.rs index fd93c90bbd61b..e8976e9131026 100644 --- a/src/sinks/new_relic/sink.rs +++ b/src/sinks/new_relic/sink.rs @@ -123,7 +123,7 @@ where let protocol = get_http_scheme_from_uri(&self.credentials.get_uri()); input - .batched(self.batcher_settings.into_byte_size_config()) + .batched(self.batcher_settings.as_byte_size_config()) .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map( |request: Result| async move { diff --git a/src/sinks/opendal_common.rs b/src/sinks/opendal_common.rs index 3ff05f214f1c7..34b4962771835 100644 --- a/src/sinks/opendal_common.rs +++ b/src/sinks/opendal_common.rs @@ -78,7 +78,7 @@ where let request_builder = self.request_builder; input - .batched_partitioned(partitioner, settings) + .batched_partitioned(partitioner, || settings.as_byte_size_config()) .filter_map(|(key, batch)| async move { // We don't need to emit an error here if the event is dropped since this will occur if the template // couldn't be rendered during the partitioning. A `TemplateRenderingError` is already emitted when diff --git a/src/sinks/redis/sink.rs b/src/sinks/redis/sink.rs index a9dcad68682ff..128ce05adddef 100644 --- a/src/sinks/redis/sink.rs +++ b/src/sinks/redis/sink.rs @@ -86,7 +86,7 @@ impl RedisSink { let mut encoder = self.encoder.clone(); let transformer = self.transformer.clone(); - let batcher_settings = self.batcher_settings.into_byte_size_config(); + let batcher_settings = self.batcher_settings.as_byte_size_config(); input .filter_map(|event| future::ready(self.make_redis_event(event))) diff --git a/src/sinks/s3_common/sink.rs b/src/sinks/s3_common/sink.rs index c26f6f93e8a0f..fd6733a484a75 100644 --- a/src/sinks/s3_common/sink.rs +++ b/src/sinks/s3_common/sink.rs @@ -40,11 +40,10 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let partitioner = self.partitioner; let settings = self.batcher_settings; - let request_builder = self.request_builder; input - .batched_partitioned(partitioner, settings) + .batched_partitioned(partitioner, || settings.as_byte_size_config()) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index 555a3d9b95076..7fed3bf3310e0 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -61,6 +61,7 @@ where timestamp_key: self.timestamp_key.clone(), endpoint_target: self.endpoint_target, }; + let batch_settings = self.batch_settings; input .map(move |event| process_log(event, &data)) @@ -77,7 +78,7 @@ where } else { EventPartitioner::new(None, None, None, None) }, - self.batch_settings, + || batch_settings.as_byte_size_config(), ) .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/splunk_hec/metrics/sink.rs b/src/sinks/splunk_hec/metrics/sink.rs index ef42d21b60370..6f6ad5d74b9dd 100644 --- a/src/sinks/splunk_hec/metrics/sink.rs +++ b/src/sinks/splunk_hec/metrics/sink.rs @@ -39,6 +39,7 @@ where let index = self.index.as_ref(); let host_key = self.host_key.as_ref(); let default_namespace = self.default_namespace.as_deref(); + let batch_settings = self.batch_settings; input .map(|event| (event.size_of(), event.into_metric())) @@ -53,7 +54,7 @@ where default_namespace, )) }) - .batched_partitioned(EventPartitioner, self.batch_settings) + .batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), self.request_builder, diff --git a/src/sinks/statsd/sink.rs b/src/sinks/statsd/sink.rs index a9f0bdf44a9e2..f5c77e91b4f86 100644 --- a/src/sinks/statsd/sink.rs +++ b/src/sinks/statsd/sink.rs @@ -58,7 +58,7 @@ where // other metric types in type-specific ways i.e. incremental gauge updates use a // different syntax, etc. .normalized_with_default::() - .batched(self.batch_settings.into_item_size_config(StatsdBatchSizer)) + .batched(self.batch_settings.as_item_size_config(StatsdBatchSizer)) // We build our requests "incrementally", which means that for a single batch of // metrics, we might generate N requests to represent all of the metrics in the batch. // diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index 617697ea84ec1..a25f0badb6f70 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -17,8 +17,7 @@ use vector_core::{ partition::Partitioner, stream::{ batcher::{config::BatchConfig, Batcher}, - BatcherSettings, ConcurrentMap, Driver, DriverResponse, ExpirationQueue, - PartitionedBatcher, + ConcurrentMap, Driver, DriverResponse, ExpirationQueue, PartitionedBatcher, }, ByteSizeOf, }; @@ -45,16 +44,18 @@ pub trait SinkBuilderExt: Stream { /// The stream will yield batches of events, with their partition key, when either a batch fills /// up or times out. [`Partitioner`] operates on a per-event basis, and has access to the event /// itself, and so can access any and all fields of an event. - fn batched_partitioned

( + fn batched_partitioned( self, partitioner: P, - settings: BatcherSettings, - ) -> PartitionedBatcher> + settings: F, + ) -> PartitionedBatcher, C, F> where Self: Stream + Sized, P: Partitioner + Unpin, P::Key: Eq + Hash + Clone, P::Item: ByteSizeOf, + C: BatchConfig, + F: Fn() -> C + Send, { PartitionedBatcher::new(self, partitioner, settings) }