From b439288d3bf7b5256e73bc1560b248d21a211f4e Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 6 Oct 2023 12:04:53 +0100 Subject: [PATCH 01/10] Update PartitionBatcher to use BatchConfig Signed-off-by: Stephen Wakely --- .../src/stream/partitioned_batcher.rs | 332 +++++++++--------- src/sinks/aws_cloudwatch_logs/sink.rs | 5 +- src/sinks/aws_kinesis/sink.rs | 4 +- src/sinks/azure_common/sink.rs | 7 +- src/sinks/clickhouse/sink.rs | 4 +- src/sinks/datadog/logs/sink.rs | 6 +- src/sinks/datadog/metrics/sink.rs | 6 +- src/sinks/datadog/traces/sink.rs | 7 +- src/sinks/gcs_common/sink.rs | 5 +- src/sinks/loki/sink.rs | 6 +- src/sinks/opendal_common.rs | 7 +- src/sinks/s3_common/sink.rs | 7 +- src/sinks/splunk_hec/logs/sink.rs | 3 +- src/sinks/splunk_hec/metrics/sink.rs | 6 +- src/sinks/util/builder.rs | 10 +- 15 files changed, 226 insertions(+), 189 deletions(-) diff --git a/lib/vector-core/src/stream/partitioned_batcher.rs b/lib/vector-core/src/stream/partitioned_batcher.rs index aa2f7e6b58bcd..165c3ad3d35ff 100644 --- a/lib/vector-core/src/stream/partitioned_batcher.rs +++ b/lib/vector-core/src/stream/partitioned_batcher.rs @@ -1,8 +1,6 @@ use std::{ - cmp, collections::HashMap, hash::{BuildHasherDefault, Hash}, - mem, num::NonZeroUsize, pin::Pin, task::{ready, Context, Poll}, @@ -25,6 +23,8 @@ use crate::{ ByteSizeOf, }; +use super::batcher::BatchConfig; + /// A `KeyedTimer` based on `DelayQueue`. pub struct ExpirationQueue { /// The timeout to give each new key entry @@ -100,114 +100,6 @@ where } } -/// A batch for use by `Batcher` -/// -/// This structure is a private implementation detail that simplifies the -/// implementation of `Batcher`. It is the actual store of items that come -/// through the stream manipulated by `Batcher` plus limit information to signal -/// when the `Batch` is full. -struct Batch { - /// The total number of `I` bytes stored, does not any overhead in this - /// structure. - allocated_bytes: usize, - /// The maximum number of elements allowed in this structure. - element_limit: usize, - /// The maximum number of allocated bytes (not including overhead) allowed - /// in this structure. - allocation_limit: usize, - /// The store of `I` elements. - elements: Vec, -} - -impl ByteSizeOf for Batch { - fn allocated_bytes(&self) -> usize { - self.allocated_bytes - } -} - -impl Batch -where - I: ByteSizeOf, -{ - /// Create a new Batch instance - /// - /// Creates a new batch instance with specific element and allocation limits. The element limit - /// is a maximum cap on the number of `I` instances. The allocation limit is a soft-max on the - /// number of allocated bytes stored in this batch, not taking into account overhead from this - /// structure itself. - /// - /// If `allocation_limit` is smaller than the size of `I` as reported by `std::mem::size_of`, - /// then the allocation limit will be raised such that the batch can hold a single instance of - /// `I`. Likewise, `element_limit` will be raised such that it is always at least 1, ensuring - /// that a new batch can be pushed into. - fn new(element_limit: usize, allocation_limit: usize) -> Self { - // SAFETY: `element_limit` is always non-zero because `BatcherSettings` can only be - // constructed with `NonZeroUsize` versions of allocation limit/item limit. `Batch` is also - // only constructable via `Batcher`. - - // TODO: This may need to be reworked, because it's subtly wrong as-is. - // ByteSizeOf::size_of() always returns the size of the type itself, plus any "allocated - // bytes". Thus, there are times when an item will be bigger than simply the size of the - // type itself (aka mem::size_of::()) and thus than type of item would never fit in a - // batch where the `allocation_limit` is at or lower than the size of that item. - // - // We're counteracting this here by ensuring that the element limit is always at least 1. - let allocation_limit = cmp::max(allocation_limit, mem::size_of::()); - Self { - allocated_bytes: 0, - element_limit, - allocation_limit, - elements: Vec::with_capacity(128), - } - } - - /// Unconditionally insert an element into the batch - /// - /// This function is similar to `push` except that the caller does not need - /// to call `has_space` prior to calling this and it will never - /// panic. Intended to be used only when insertion must not fail. - fn with(mut self, value: I) -> Self { - self.allocated_bytes += value.size_of(); - self.elements.push(value); - self - } - - /// Decompose the batch - /// - /// Called by the user when they want to get at the internal store of - /// items. Returns a tuple, the first element being the allocated size of - /// stored items and the second the store of items. - fn into_inner(self) -> Vec { - self.elements - } - - /// Whether the batch has space for a new item - /// - /// This function returns true of there is space both in terms of item count - /// and byte count for the given item, false otherwise. - fn has_space(&self, value: &I) -> bool { - let too_many_elements = self.elements.len() + 1 > self.element_limit; - let too_many_bytes = self.allocated_bytes + value.size_of() > self.allocation_limit; - !(too_many_elements || too_many_bytes) - } - - /// Push an element into the batch - /// - /// This function pushes an element into the batch. Callers must be sure to - /// call `has_space` prior to calling this function and receive a positive - /// result. - /// - /// # Panics - /// - /// This function will panic if there is not sufficient space in the batch - /// for a new element to be inserted. - fn push(&mut self, value: I) { - assert!(self.has_space(&value)); - self.allocated_bytes += value.size_of(); - self.elements.push(value); - } -} - /// Controls the behavior of the batcher in terms of batch size and flush interval. /// /// This is a temporary solution for pushing in a fixed settings structure so we don't have to worry @@ -287,18 +179,16 @@ impl BatcherSettings { } #[pin_project] -pub struct PartitionedBatcher +pub struct PartitionedBatcher where Prt: Partitioner, { - /// The total number of bytes a single batch in this struct is allowed to - /// hold. - batch_allocation_limit: usize, - /// The maximum number of items that are allowed per-batch - batch_item_limit: usize, + /// A closure that retrievs a new `BatchConfig` when needed to batch a + /// new partition. + state: Box C + Send>, /// The store of live batches. Note that the key here is an option type, /// on account of the interface of `Prt`. - batches: HashMap, BuildHasherDefault>, + batches: HashMap>, /// The store of 'closed' batches. When this is not empty it will be /// preferentially flushed prior to consuming any new items from the /// underlying stream. @@ -312,44 +202,44 @@ where stream: Fuse, } -impl PartitionedBatcher> +impl PartitionedBatcher, C> where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, Prt::Item: ByteSizeOf, + C: BatchConfig, { - pub fn new(stream: St, partitioner: Prt, settings: BatcherSettings) -> Self { + pub fn new(stream: St, partitioner: Prt, settings: Box C + Send>) -> Self { + let timeout = settings().timeout(); Self { - batch_allocation_limit: settings.size_limit, - batch_item_limit: settings.item_limit, + state: settings, batches: HashMap::default(), closed_batches: Vec::default(), - timer: ExpirationQueue::new(settings.timeout), + timer: ExpirationQueue::new(timeout), partitioner, stream: stream.fuse(), } } } -impl PartitionedBatcher +#[cfg(test)] +impl PartitionedBatcher where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, Prt::Item: ByteSizeOf, + C: BatchConfig, { pub fn with_timer( stream: St, partitioner: Prt, timer: KT, - batch_item_limit: NonZeroUsize, - batch_allocation_limit: Option, + settings: Box C + Send>, ) -> Self { Self { - batch_allocation_limit: batch_allocation_limit - .map_or(usize::max_value(), NonZeroUsize::get), - batch_item_limit: batch_item_limit.get(), + state: settings, batches: HashMap::default(), closed_batches: Vec::default(), timer, @@ -359,13 +249,14 @@ where } } -impl Stream for PartitionedBatcher +impl Stream for PartitionedBatcher where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, Prt::Item: ByteSizeOf, KT: KeyedTimer, + C: BatchConfig>, { type Item = (Prt::Key, Vec); @@ -385,13 +276,11 @@ where // here but still be usable later if more entries are added. Poll::Pending | Poll::Ready(None) => return Poll::Pending, Poll::Ready(Some(item_key)) => { - let batch = this + let mut batch = this .batches .remove(&item_key) .expect("batch should exist if it is set to expire"); - this.closed_batches.push((item_key, batch.into_inner())); - - continue; + this.closed_batches.push((item_key, batch.take_batch())); } }, Poll::Ready(None) => { @@ -405,7 +294,7 @@ where this.closed_batches.extend( this.batches .drain() - .map(|(key, batch)| (key, batch.into_inner())), + .map(|(key, mut batch)| (key, batch.take_batch())), ); continue; } @@ -413,33 +302,37 @@ where } Poll::Ready(Some(item)) => { let item_key = this.partitioner.partition(&item); - let item_limit: usize = *this.batch_item_limit; - let alloc_limit: usize = *this.batch_allocation_limit; - - if let Some(batch) = this.batches.get_mut(&item_key) { - if batch.has_space(&item) { - // When there's space in the partition batch just - // push the item in and loop back around. - batch.push(item); - } else { - let new_batch = Batch::new(item_limit, alloc_limit).with(item); - let batch = mem::replace(batch, new_batch); - // The batch for this partition key was set to - // expire, but now it's overflowed and must be - // pushed out, so now we reset the batch timeout. - this.timer.insert(item_key.clone()); - - this.closed_batches.push((item_key, batch.into_inner())); + // Get the batch for this partition, or create a new one. + let batch = match this.batches.get_mut(&item_key) { + Some(batch) => batch, + None => { + let batch = (this.state)(); + this.batches.insert(item_key.clone(), batch); + this.batches + .get_mut(&item_key) + .expect("batch has just been inserted so should exist") } - } else { - // We have no batch yet for this partition key, so - // create one and create the expiration entries as well. - // This allows the batch to expire before filling up, - // and vice versa. - let batch = Batch::new(item_limit, alloc_limit).with(item); - this.batches.insert(item_key.clone(), batch); - this.timer.insert(item_key); + }; + + let (fits, metadata) = batch.item_fits_in_batch(&item); + if !fits { + // This batch is too full to accept a new item, so we move the contents of + // the batch into `closed_batches` to be push out of this stream on the + // next iteration. + this.closed_batches + .push((item_key.clone(), batch.take_batch())); + } + + // Insert the item into the batch. + batch.push(item, metadata); + this.timer.insert(item_key.clone()); + if batch.is_batch_full() { + // If the insertion means the batch is now full, we clear out the batch and + // remove it from the list. + this.closed_batches + .push((item_key.clone(), batch.take_batch())); + this.batches.remove(&item_key); } } } @@ -464,7 +357,10 @@ mod test { use crate::{ partition::Partitioner, - stream::partitioned_batcher::{ExpirationQueue, PartitionedBatcher}, + stream::{ + partitioned_batcher::{ExpirationQueue, PartitionedBatcher}, + BatcherSettings, + }, time::KeyedTimer, }; @@ -571,11 +467,12 @@ mod test { // key. let mut stream = stream::iter(stream.into_iter()); let stream_size_hint = stream.size_hint(); - let item_limit = NonZeroUsize::new(item_limit as usize).unwrap(); let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); + let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); + let batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, timer, - item_limit, Some(allocation_limit)); + Box::new(move || batch_settings.into_byte_size_config())); let batcher_size_hint = batcher.size_hint(); assert_eq!(stream_size_hint, batcher_size_hint); @@ -597,9 +494,9 @@ mod test { let mut stream = stream::iter(stream.into_iter()); let item_limit = NonZeroUsize::new(item_limit as usize).unwrap(); let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); + let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, - timer, item_limit, - Some(allocation_limit)); + timer, Box::new(move || batch_settings.into_byte_size_config())); let mut batcher = Pin::new(&mut batcher); loop { @@ -668,9 +565,9 @@ mod test { let mut stream = stream::iter(stream.into_iter()); let item_limit = NonZeroUsize::new(item_limit as usize).unwrap(); let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); + let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, - timer, item_limit, - Some(allocation_limit)); + timer, Box::new(move || batch_settings.clone().into_byte_size_config())); let mut batcher = Pin::new(&mut batcher); loop { @@ -683,6 +580,10 @@ mod test { let expected_partition = partitions .get_mut(&key) .expect("impossible situation"); + + dbg!(&actual_batch); + dbg!(&expected_partition); + for item in actual_batch { assert_eq!(item, expected_partition.pop().unwrap()); } @@ -695,6 +596,99 @@ mod test { } } + #[tokio::test] + async fn zorkwonk() { + // let stream = (0..20).collect::>(); + // let stream = vec![ + // 7037641852729981347, + // 8644263311095074276, + // 4920834891500558744, + // 15150872024001141310, + // 12803461249171200340, + // ]; + let stream = vec![ + 7037641852729981347, + 8644263311095074276, + 4920834891500558744, + 15150872024001141310, + 12803461249171200340, + ]; + let item_limit = 1; + let allocation_limit = 8; + let partitioner = TestPartitioner { + key_space: NonZeroU8::new(23).unwrap(), + }; + let mut count = 0; + let timer = TestTimer::new( + std::iter::from_fn(move || { + if count > 254 { + None + } else { + count += 1; + if count % 5 == 0 { + Some(Poll::Ready(Some(count))) + } else { + Some(Poll::Ready(None)) + } + } + }) + .collect(), + ); + + // Asserts that for every received batch received the elements in + // the batch are not reordered within a batch. No claim is made on + // when batches themselves will issue, batch sizes etc. + let noop_waker = futures::task::noop_waker(); + let mut cx = Context::from_waker(&noop_waker); + + let mut partitions = separate_partitions(stream.clone(), &partitioner); + dbg!(&partitions); + + let mut stream = stream::iter(stream.into_iter()); + let item_limit = NonZeroUsize::new(item_limit as usize).unwrap(); + let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); + let batch_settings = + BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); + + let mut batcher = PartitionedBatcher::with_timer( + &mut stream, + partitioner, + timer, + Box::new(move || batch_settings.clone().into_byte_size_config()), + ); + // let mut batcher = PartitionedBatcher::new( + // &mut stream, + // partitioner, + // // timer, + // Box::new(move || batch_settings.clone().into_byte_size_config()), + // ); + let mut batcher = Pin::new(&mut batcher); + + loop { + match batcher.as_mut().poll_next(&mut cx) { + Poll::Pending => {} + Poll::Ready(None) => { + break; + } + Poll::Ready(Some((key, actual_batch))) => { + let expected_partition = + partitions.get_mut(&key).expect("impossible situation"); + + dbg!(&actual_batch); + dbg!(&expected_partition); + + for item in actual_batch { + assert_eq!(item, expected_partition.pop().unwrap()); + } + } + } + } + for v in partitions.values() { + assert!(v.is_empty()); + } + // panic!(); + } + proptest! { #[test] fn batch_does_not_lose_items(stream: Vec, @@ -711,9 +705,11 @@ mod test { let mut stream = stream::iter(stream.into_iter()); let item_limit = NonZeroUsize::new(item_limit as usize).unwrap(); let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); + let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, - timer, item_limit, - Some(allocation_limit)); + timer, Box::new(move || batch_settings.clone().into_byte_size_config())); + // let mut batcher = PartitionedBatcher::new(&mut stream, partitioner, + // Box::new(move || batch_settings.clone().into_byte_size_config())); let mut batcher = Pin::new(&mut batcher); let mut observed_items = 0; diff --git a/src/sinks/aws_cloudwatch_logs/sink.rs b/src/sinks/aws_cloudwatch_logs/sink.rs index 3c320ad6236e7..80672b3b55253 100644 --- a/src/sinks/aws_cloudwatch_logs/sink.rs +++ b/src/sinks/aws_cloudwatch_logs/sink.rs @@ -49,7 +49,10 @@ where let age_range = start..end; future::ready(age_range.contains(&req.timestamp)) }) - .batched_partitioned(CloudwatchPartitioner, batcher_settings) + .batched_partitioned( + CloudwatchPartitioner, + Box::new(move || batcher_settings.clone().into_byte_size_config()), + ) .map(|(key, events)| { let metadata = RequestMetadata::from_batch( events.iter().map(|req| req.get_metadata().clone()), diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index 61a1e539418aa..eb94a159e19c2 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -42,6 +42,8 @@ where R: Record + Send + Sync + Unpin + Clone + 'static, { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let batch_settings = self.batch_settings.clone(); + input .filter_map(|event| { // Panic: This sink only accepts Logs, so this should never panic @@ -67,7 +69,7 @@ where KinesisPartitioner { _phantom: PhantomData, }, - self.batch_settings, + Box::new(move || batch_settings.clone().into_byte_size_config()), ) .map(|(key, events)| { let metadata = RequestMetadata::from_batch( diff --git a/src/sinks/azure_common/sink.rs b/src/sinks/azure_common/sink.rs index d46910f1e4d98..a97b669c1134f 100644 --- a/src/sinks/azure_common/sink.rs +++ b/src/sinks/azure_common/sink.rs @@ -37,12 +37,15 @@ where { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let partitioner = self.partitioner; - let settings = self.batcher_settings; + let settings = self.batcher_settings.clone(); let request_builder = self.request_builder; input - .batched_partitioned(partitioner, settings) + .batched_partitioned( + partitioner, + Box::new(move || settings.clone().into_byte_size_config()), + ) .filter_map(|(key, batch)| async move { // We don't need to emit an error here if the event is dropped since this will occur if the template // couldn't be rendered during the partitioning. A `TemplateRenderingError` is already emitted when diff --git a/src/sinks/clickhouse/sink.rs b/src/sinks/clickhouse/sink.rs index c951101c8ce21..949b2748fc1fa 100644 --- a/src/sinks/clickhouse/sink.rs +++ b/src/sinks/clickhouse/sink.rs @@ -42,10 +42,12 @@ impl ClickhouseSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let batch_settings = self.batch_settings.clone(); + input .batched_partitioned( KeyPartitioner::new(self.database, self.table), - self.batch_settings, + Box::new(move || batch_settings.clone().into_byte_size_config()), ) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) .request_builder( diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 363fa55383d2b..6055866e3e85e 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -261,8 +261,12 @@ where let default_api_key = Arc::clone(&self.default_api_key); let partitioner = EventPartitioner; + let batch_settings = self.batch_settings.clone(); - let input = input.batched_partitioned(partitioner, self.batch_settings); + let input = input.batched_partitioned( + partitioner, + Box::new(move || batch_settings.clone().into_byte_size_config()), + ); input .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index fccfe040fdd95..e7da4f2036015 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -84,6 +84,7 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let mut splitter: MetricSplitter = MetricSplitter::default(); + let batch_settings = self.batch_settings.clone(); input // Convert `Event` to `Metric` so we don't have to deal with constant conversions. @@ -98,7 +99,10 @@ where .normalized_with_default::() // We batch metrics by their endpoint: series endpoint for counters, gauge, and sets vs sketch endpoint for // distributions, aggregated histograms, and sketches. - .batched_partitioned(DatadogMetricsTypePartitioner, self.batch_settings) + .batched_partitioned( + DatadogMetricsTypePartitioner, + Box::new(move || batch_settings.clone().into_byte_size_config()), + ) // Aggregate counters with identical timestamps, otherwise identical counters (same // series and same timestamp, when rounded to whole seconds) will be dropped in a // last-write-wins situation when they hit the DD metrics intake. diff --git a/src/sinks/datadog/traces/sink.rs b/src/sinks/datadog/traces/sink.rs index 4cf502a7eebae..c28b053089297 100644 --- a/src/sinks/datadog/traces/sink.rs +++ b/src/sinks/datadog/traces/sink.rs @@ -108,8 +108,13 @@ where } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let batch_settings = self.batch_settings.clone(); + input - .batched_partitioned(EventPartitioner, self.batch_settings) + .batched_partitioned( + EventPartitioner, + Box::new(move || batch_settings.clone().into_byte_size_config()), + ) .incremental_request_builder(self.request_builder) .flat_map(stream::iter) .filter_map(|request| async move { diff --git a/src/sinks/gcs_common/sink.rs b/src/sinks/gcs_common/sink.rs index 830dbd4ca6301..7955d064052e0 100644 --- a/src/sinks/gcs_common/sink.rs +++ b/src/sinks/gcs_common/sink.rs @@ -45,7 +45,10 @@ where let request_builder = self.request_builder; input - .batched_partitioned(partitioner, settings) + .batched_partitioned( + partitioner, + Box::new(move || settings.clone().into_byte_size_config()), + ) .filter_map(|(key, batch)| async move { // A `TemplateRenderingError` will have been emitted by `KeyPartitioner` if the key here is `None`, // thus no further `EventsDropped` event needs emitting at this stage. diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index 9d9046788d04b..4619be1370dd6 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -453,12 +453,16 @@ impl LokiSink { NonZeroUsize::new(1).expect("static") } }; + let batch_settings = self.batch_settings.clone(); input .map(|event| encoder.encode_event(event)) .filter_map(|event| async { event }) .map(|record| filter.filter_record(record)) - .batched_partitioned(RecordPartitioner, self.batch_settings) + .batched_partitioned( + RecordPartitioner, + Box::new(move || batch_settings.clone().into_byte_size_config()), + ) .filter_map(|(partition, batch)| async { if let Some(partition) = partition { let mut count: usize = 0; diff --git a/src/sinks/opendal_common.rs b/src/sinks/opendal_common.rs index 3ff05f214f1c7..ba94b05e145d0 100644 --- a/src/sinks/opendal_common.rs +++ b/src/sinks/opendal_common.rs @@ -73,12 +73,15 @@ where { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let partitioner = self.partitioner; - let settings = self.batcher_settings; + let settings = self.batcher_settings.clone(); let request_builder = self.request_builder; input - .batched_partitioned(partitioner, settings) + .batched_partitioned( + partitioner, + Box::new(move || settings.clone().into_byte_size_config()), + ) .filter_map(|(key, batch)| async move { // We don't need to emit an error here if the event is dropped since this will occur if the template // couldn't be rendered during the partitioning. A `TemplateRenderingError` is already emitted when diff --git a/src/sinks/s3_common/sink.rs b/src/sinks/s3_common/sink.rs index c26f6f93e8a0f..57bf735fbb687 100644 --- a/src/sinks/s3_common/sink.rs +++ b/src/sinks/s3_common/sink.rs @@ -39,12 +39,15 @@ where { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let partitioner = self.partitioner; - let settings = self.batcher_settings; + let settings = self.batcher_settings.clone(); let request_builder = self.request_builder; input - .batched_partitioned(partitioner, settings) + .batched_partitioned( + partitioner, + Box::new(move || settings.clone().into_byte_size_config()), + ) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index 555a3d9b95076..9ba19c5ffcad8 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -61,6 +61,7 @@ where timestamp_key: self.timestamp_key.clone(), endpoint_target: self.endpoint_target, }; + let batch_settings = self.batch_settings.clone(); input .map(move |event| process_log(event, &data)) @@ -77,7 +78,7 @@ where } else { EventPartitioner::new(None, None, None, None) }, - self.batch_settings, + Box::new(move || batch_settings.clone().into_byte_size_config()), ) .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/splunk_hec/metrics/sink.rs b/src/sinks/splunk_hec/metrics/sink.rs index ef42d21b60370..f5e07c18d73fa 100644 --- a/src/sinks/splunk_hec/metrics/sink.rs +++ b/src/sinks/splunk_hec/metrics/sink.rs @@ -39,6 +39,7 @@ where let index = self.index.as_ref(); let host_key = self.host_key.as_ref(); let default_namespace = self.default_namespace.as_deref(); + let batch_settings = self.batch_settings.clone(); input .map(|event| (event.size_of(), event.into_metric())) @@ -53,7 +54,10 @@ where default_namespace, )) }) - .batched_partitioned(EventPartitioner, self.batch_settings) + .batched_partitioned( + EventPartitioner, + Box::new(move || batch_settings.clone().into_byte_size_config()), + ) .request_builder( default_request_builder_concurrency_limit(), self.request_builder, diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index 617697ea84ec1..30025bbdf9bf1 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -17,8 +17,7 @@ use vector_core::{ partition::Partitioner, stream::{ batcher::{config::BatchConfig, Batcher}, - BatcherSettings, ConcurrentMap, Driver, DriverResponse, ExpirationQueue, - PartitionedBatcher, + ConcurrentMap, Driver, DriverResponse, ExpirationQueue, PartitionedBatcher, }, ByteSizeOf, }; @@ -45,16 +44,17 @@ pub trait SinkBuilderExt: Stream { /// The stream will yield batches of events, with their partition key, when either a batch fills /// up or times out. [`Partitioner`] operates on a per-event basis, and has access to the event /// itself, and so can access any and all fields of an event. - fn batched_partitioned

( + fn batched_partitioned( self, partitioner: P, - settings: BatcherSettings, - ) -> PartitionedBatcher> + settings: Box C + Send>, + ) -> PartitionedBatcher, C> where Self: Stream + Sized, P: Partitioner + Unpin, P::Key: Eq + Hash + Clone, P::Item: ByteSizeOf, + C: BatchConfig, { PartitionedBatcher::new(self, partitioner, settings) } From cc81a6838eec43b15faa357f06a546abfea09512 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 6 Oct 2023 12:27:46 +0100 Subject: [PATCH 02/10] Remove zorkwonk Signed-off-by: Stephen Wakely --- .../src/stream/partitioned_batcher.rs | 93 ------------------- 1 file changed, 93 deletions(-) diff --git a/lib/vector-core/src/stream/partitioned_batcher.rs b/lib/vector-core/src/stream/partitioned_batcher.rs index 165c3ad3d35ff..400d74aa19188 100644 --- a/lib/vector-core/src/stream/partitioned_batcher.rs +++ b/lib/vector-core/src/stream/partitioned_batcher.rs @@ -596,99 +596,6 @@ mod test { } } - #[tokio::test] - async fn zorkwonk() { - // let stream = (0..20).collect::>(); - // let stream = vec![ - // 7037641852729981347, - // 8644263311095074276, - // 4920834891500558744, - // 15150872024001141310, - // 12803461249171200340, - // ]; - let stream = vec![ - 7037641852729981347, - 8644263311095074276, - 4920834891500558744, - 15150872024001141310, - 12803461249171200340, - ]; - let item_limit = 1; - let allocation_limit = 8; - let partitioner = TestPartitioner { - key_space: NonZeroU8::new(23).unwrap(), - }; - let mut count = 0; - let timer = TestTimer::new( - std::iter::from_fn(move || { - if count > 254 { - None - } else { - count += 1; - if count % 5 == 0 { - Some(Poll::Ready(Some(count))) - } else { - Some(Poll::Ready(None)) - } - } - }) - .collect(), - ); - - // Asserts that for every received batch received the elements in - // the batch are not reordered within a batch. No claim is made on - // when batches themselves will issue, batch sizes etc. - let noop_waker = futures::task::noop_waker(); - let mut cx = Context::from_waker(&noop_waker); - - let mut partitions = separate_partitions(stream.clone(), &partitioner); - dbg!(&partitions); - - let mut stream = stream::iter(stream.into_iter()); - let item_limit = NonZeroUsize::new(item_limit as usize).unwrap(); - let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); - let batch_settings = - BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); - - let mut batcher = PartitionedBatcher::with_timer( - &mut stream, - partitioner, - timer, - Box::new(move || batch_settings.clone().into_byte_size_config()), - ); - // let mut batcher = PartitionedBatcher::new( - // &mut stream, - // partitioner, - // // timer, - // Box::new(move || batch_settings.clone().into_byte_size_config()), - // ); - let mut batcher = Pin::new(&mut batcher); - - loop { - match batcher.as_mut().poll_next(&mut cx) { - Poll::Pending => {} - Poll::Ready(None) => { - break; - } - Poll::Ready(Some((key, actual_batch))) => { - let expected_partition = - partitions.get_mut(&key).expect("impossible situation"); - - dbg!(&actual_batch); - dbg!(&expected_partition); - - for item in actual_batch { - assert_eq!(item, expected_partition.pop().unwrap()); - } - } - } - } - for v in partitions.values() { - assert!(v.is_empty()); - } - // panic!(); - } - proptest! { #[test] fn batch_does_not_lose_items(stream: Vec, From 830272510eb37815f6479a68eac08cf78114e79e Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 6 Oct 2023 12:41:07 +0100 Subject: [PATCH 03/10] Make into fns as fns instead Signed-off-by: Stephen Wakely --- lib/vector-core/src/stream/batcher/mod.rs | 9 +++------ .../src/stream/partitioned_batcher.rs | 18 ++++++++---------- src/sinks/appsignal/sink.rs | 2 +- src/sinks/aws_cloudwatch_logs/sink.rs | 2 +- src/sinks/aws_kinesis/sink.rs | 2 +- src/sinks/azure_common/sink.rs | 2 +- src/sinks/azure_monitor_logs/sink.rs | 2 +- src/sinks/clickhouse/sink.rs | 2 +- src/sinks/databend/sink.rs | 2 +- src/sinks/datadog/logs/sink.rs | 2 +- src/sinks/datadog/metrics/sink.rs | 2 +- src/sinks/datadog/traces/sink.rs | 2 +- src/sinks/elasticsearch/sink.rs | 2 +- src/sinks/gcp/stackdriver/logs/sink.rs | 5 +---- src/sinks/gcs_common/sink.rs | 2 +- src/sinks/greptimedb/sink.rs | 2 +- src/sinks/honeycomb/sink.rs | 5 +---- src/sinks/http/sink.rs | 2 +- src/sinks/loki/sink.rs | 2 +- src/sinks/new_relic/sink.rs | 2 +- src/sinks/opendal_common.rs | 2 +- src/sinks/redis/sink.rs | 2 +- src/sinks/s3_common/sink.rs | 2 +- src/sinks/splunk_hec/logs/sink.rs | 2 +- src/sinks/splunk_hec/metrics/sink.rs | 2 +- src/sinks/statsd/sink.rs | 2 +- 26 files changed, 35 insertions(+), 46 deletions(-) diff --git a/lib/vector-core/src/stream/batcher/mod.rs b/lib/vector-core/src/stream/batcher/mod.rs index a7153a5a8665d..bfb20e95e4160 100644 --- a/lib/vector-core/src/stream/batcher/mod.rs +++ b/lib/vector-core/src/stream/batcher/mod.rs @@ -129,10 +129,7 @@ mod test { NonZeroUsize::new(10000).unwrap(), NonZeroUsize::new(2).unwrap(), ); - let batcher = Batcher::new( - stream, - settings.into_item_size_config(|x: &u32| *x as usize), - ); + let batcher = Batcher::new(stream, settings.as_item_size_config(|x: &u32| *x as usize)); let batches: Vec<_> = batcher.collect().await; assert_eq!(batches, vec![vec![1, 2], vec![3],]); } @@ -146,7 +143,7 @@ mod test { NonZeroUsize::new(5).unwrap(), NonZeroUsize::new(100).unwrap(), ) - .into_item_size_config(|x: &u32| *x as usize), + .as_item_size_config(|x: &u32| *x as usize), ); let batches: Vec<_> = batcher.collect().await; assert_eq!( @@ -176,7 +173,7 @@ mod test { NonZeroUsize::new(5).unwrap(), NonZeroUsize::new(100).unwrap(), ) - .into_item_size_config(|x: &u32| *x as usize), + .as_item_size_config(|x: &u32| *x as usize), ); tokio::pin!(batcher); diff --git a/lib/vector-core/src/stream/partitioned_batcher.rs b/lib/vector-core/src/stream/partitioned_batcher.rs index 400d74aa19188..8573e86f707e0 100644 --- a/lib/vector-core/src/stream/partitioned_batcher.rs +++ b/lib/vector-core/src/stream/partitioned_batcher.rs @@ -129,15 +129,15 @@ impl BatcherSettings { /// A batcher config using the `ByteSizeOf` trait to determine batch sizes. /// The output is a `Vec`. - pub fn into_byte_size_config( - self, + pub fn as_byte_size_config( + &self, ) -> BatchConfigParts, Vec> { - self.into_item_size_config(ByteSizeOfItemSize) + self.as_item_size_config(ByteSizeOfItemSize) } /// A batcher config using the `ItemBatchSize` trait to determine batch sizes. /// The output is a `Vec`. - pub fn into_item_size_config(self, item_size: I) -> BatchConfigParts, Vec> + pub fn as_item_size_config(&self, item_size: I) -> BatchConfigParts, Vec> where I: ItemBatchSize, { @@ -472,7 +472,7 @@ mod test { let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); let batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, timer, - Box::new(move || batch_settings.into_byte_size_config())); + Box::new(move || batch_settings.as_byte_size_config())); let batcher_size_hint = batcher.size_hint(); assert_eq!(stream_size_hint, batcher_size_hint); @@ -496,7 +496,7 @@ mod test { let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, - timer, Box::new(move || batch_settings.into_byte_size_config())); + timer, Box::new(move || batch_settings.as_byte_size_config())); let mut batcher = Pin::new(&mut batcher); loop { @@ -567,7 +567,7 @@ mod test { let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, - timer, Box::new(move || batch_settings.clone().into_byte_size_config())); + timer, Box::new(move || batch_settings.as_byte_size_config())); let mut batcher = Pin::new(&mut batcher); loop { @@ -614,9 +614,7 @@ mod test { let allocation_limit = NonZeroUsize::new(allocation_limit as usize).unwrap(); let batch_settings = BatcherSettings::new(Duration::from_secs(1), allocation_limit, item_limit); let mut batcher = PartitionedBatcher::with_timer(&mut stream, partitioner, - timer, Box::new(move || batch_settings.clone().into_byte_size_config())); - // let mut batcher = PartitionedBatcher::new(&mut stream, partitioner, - // Box::new(move || batch_settings.clone().into_byte_size_config())); + timer, Box::new(move || batch_settings.as_byte_size_config())); let mut batcher = Pin::new(&mut batcher); let mut observed_items = 0; diff --git a/src/sinks/appsignal/sink.rs b/src/sinks/appsignal/sink.rs index 1ad0e3e0c8779..e066b6dd94a30 100644 --- a/src/sinks/appsignal/sink.rs +++ b/src/sinks/appsignal/sink.rs @@ -34,7 +34,7 @@ where Some(event) }) }) - .batched(self.batch_settings.into_byte_size_config()) + .batched(self.batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), AppsignalRequestBuilder { diff --git a/src/sinks/aws_cloudwatch_logs/sink.rs b/src/sinks/aws_cloudwatch_logs/sink.rs index 80672b3b55253..b23d752a58977 100644 --- a/src/sinks/aws_cloudwatch_logs/sink.rs +++ b/src/sinks/aws_cloudwatch_logs/sink.rs @@ -51,7 +51,7 @@ where }) .batched_partitioned( CloudwatchPartitioner, - Box::new(move || batcher_settings.clone().into_byte_size_config()), + Box::new(move || batcher_settings.as_byte_size_config()), ) .map(|(key, events)| { let metadata = RequestMetadata::from_batch( diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index eb94a159e19c2..7e72234ab1743 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -69,7 +69,7 @@ where KinesisPartitioner { _phantom: PhantomData, }, - Box::new(move || batch_settings.clone().into_byte_size_config()), + Box::new(move || batch_settings.as_byte_size_config()), ) .map(|(key, events)| { let metadata = RequestMetadata::from_batch( diff --git a/src/sinks/azure_common/sink.rs b/src/sinks/azure_common/sink.rs index a97b669c1134f..4f8c8c11c7fc2 100644 --- a/src/sinks/azure_common/sink.rs +++ b/src/sinks/azure_common/sink.rs @@ -44,7 +44,7 @@ where input .batched_partitioned( partitioner, - Box::new(move || settings.clone().into_byte_size_config()), + Box::new(move || settings.as_byte_size_config()), ) .filter_map(|(key, batch)| async move { // We don't need to emit an error here if the event is dropped since this will occur if the template diff --git a/src/sinks/azure_monitor_logs/sink.rs b/src/sinks/azure_monitor_logs/sink.rs index ce06e04bb83b8..20db422f01420 100644 --- a/src/sinks/azure_monitor_logs/sink.rs +++ b/src/sinks/azure_monitor_logs/sink.rs @@ -39,7 +39,7 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input - .batched(self.batch_settings.into_byte_size_config()) + .batched(self.batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), AzureMonitorLogsRequestBuilder { diff --git a/src/sinks/clickhouse/sink.rs b/src/sinks/clickhouse/sink.rs index 949b2748fc1fa..62af596bb1f07 100644 --- a/src/sinks/clickhouse/sink.rs +++ b/src/sinks/clickhouse/sink.rs @@ -47,7 +47,7 @@ impl ClickhouseSink { input .batched_partitioned( KeyPartitioner::new(self.database, self.table), - Box::new(move || batch_settings.clone().into_byte_size_config()), + Box::new(move || batch_settings.as_byte_size_config()), ) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) .request_builder( diff --git a/src/sinks/databend/sink.rs b/src/sinks/databend/sink.rs index 87b3d1155cc9d..dcb625ce1ffe6 100644 --- a/src/sinks/databend/sink.rs +++ b/src/sinks/databend/sink.rs @@ -24,7 +24,7 @@ impl DatabendSink { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input - .batched(self.batch_settings.into_byte_size_config()) + .batched(self.batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), self.request_builder, diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 6055866e3e85e..98afac3f4ca1f 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -265,7 +265,7 @@ where let input = input.batched_partitioned( partitioner, - Box::new(move || batch_settings.clone().into_byte_size_config()), + Box::new(move || batch_settings.as_byte_size_config()), ); input .request_builder( diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index e7da4f2036015..7788c0a55bbfa 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -101,7 +101,7 @@ where // distributions, aggregated histograms, and sketches. .batched_partitioned( DatadogMetricsTypePartitioner, - Box::new(move || batch_settings.clone().into_byte_size_config()), + Box::new(move || batch_settings.as_byte_size_config()), ) // Aggregate counters with identical timestamps, otherwise identical counters (same // series and same timestamp, when rounded to whole seconds) will be dropped in a diff --git a/src/sinks/datadog/traces/sink.rs b/src/sinks/datadog/traces/sink.rs index c28b053089297..52e7ed237c056 100644 --- a/src/sinks/datadog/traces/sink.rs +++ b/src/sinks/datadog/traces/sink.rs @@ -113,7 +113,7 @@ where input .batched_partitioned( EventPartitioner, - Box::new(move || batch_settings.clone().into_byte_size_config()), + Box::new(move || batch_settings.as_byte_size_config()), ) .incremental_request_builder(self.request_builder) .flat_map(stream::iter) diff --git a/src/sinks/elasticsearch/sink.rs b/src/sinks/elasticsearch/sink.rs index a3168d5b44809..5d3c043e233ce 100644 --- a/src/sinks/elasticsearch/sink.rs +++ b/src/sinks/elasticsearch/sink.rs @@ -81,7 +81,7 @@ where .filter_map(move |log| { future::ready(process_log(log, &mode, id_key_field, &transformer)) }) - .batched(self.batch_settings.into_byte_size_config()) + .batched(self.batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), self.request_builder, diff --git a/src/sinks/gcp/stackdriver/logs/sink.rs b/src/sinks/gcp/stackdriver/logs/sink.rs index 39870bca24432..fbf3e2066d420 100644 --- a/src/sinks/gcp/stackdriver/logs/sink.rs +++ b/src/sinks/gcp/stackdriver/logs/sink.rs @@ -36,10 +36,7 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input // Batch the input stream with size calculation based on the estimated encoded json size - .batched( - self.batch_settings - .into_item_size_config(HttpJsonBatchSizer), - ) + .batched(self.batch_settings.as_item_size_config(HttpJsonBatchSizer)) // Build requests with no concurrency limit. .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/gcs_common/sink.rs b/src/sinks/gcs_common/sink.rs index 7955d064052e0..4e6326fd8dc2e 100644 --- a/src/sinks/gcs_common/sink.rs +++ b/src/sinks/gcs_common/sink.rs @@ -47,7 +47,7 @@ where input .batched_partitioned( partitioner, - Box::new(move || settings.clone().into_byte_size_config()), + Box::new(move || settings.as_byte_size_config()), ) .filter_map(|(key, batch)| async move { // A `TemplateRenderingError` will have been emitted by `KeyPartitioner` if the key here is `None`, diff --git a/src/sinks/greptimedb/sink.rs b/src/sinks/greptimedb/sink.rs index fc8e75140708d..b4b36a10bf5db 100644 --- a/src/sinks/greptimedb/sink.rs +++ b/src/sinks/greptimedb/sink.rs @@ -37,7 +37,7 @@ impl GreptimeDBSink { .normalized_with_default::() .batched( self.batch_settings - .into_item_size_config(GreptimeDBBatchSizer), + .as_item_size_config(GreptimeDBBatchSizer), ) .map(GreptimeDBRequest::from_metrics) .into_driver(self.service) diff --git a/src/sinks/honeycomb/sink.rs b/src/sinks/honeycomb/sink.rs index 235dcd6eb4893..fe2bf4cdfc598 100644 --- a/src/sinks/honeycomb/sink.rs +++ b/src/sinks/honeycomb/sink.rs @@ -36,10 +36,7 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input // Batch the input stream with size calculation based on the estimated encoded json size - .batched( - self.batch_settings - .into_item_size_config(HttpJsonBatchSizer), - ) + .batched(self.batch_settings.as_item_size_config(HttpJsonBatchSizer)) // Build requests with default concurrency limit. .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/http/sink.rs b/src/sinks/http/sink.rs index 8427201b31662..575e558e2d013 100644 --- a/src/sinks/http/sink.rs +++ b/src/sinks/http/sink.rs @@ -33,7 +33,7 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { input // Batch the input stream with size calculation based on the configured codec - .batched(self.batch_settings.into_item_size_config(HttpBatchSizer { + .batched(self.batch_settings.as_item_size_config(HttpBatchSizer { encoder: self.request_builder.encoder.encoder.clone(), })) // Build requests with default concurrency limit. diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index 4619be1370dd6..8563e2b45286a 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -461,7 +461,7 @@ impl LokiSink { .map(|record| filter.filter_record(record)) .batched_partitioned( RecordPartitioner, - Box::new(move || batch_settings.clone().into_byte_size_config()), + Box::new(move || batch_settings.as_byte_size_config()), ) .filter_map(|(partition, batch)| async { if let Some(partition) = partition { diff --git a/src/sinks/new_relic/sink.rs b/src/sinks/new_relic/sink.rs index fd93c90bbd61b..e8976e9131026 100644 --- a/src/sinks/new_relic/sink.rs +++ b/src/sinks/new_relic/sink.rs @@ -123,7 +123,7 @@ where let protocol = get_http_scheme_from_uri(&self.credentials.get_uri()); input - .batched(self.batcher_settings.into_byte_size_config()) + .batched(self.batcher_settings.as_byte_size_config()) .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map( |request: Result| async move { diff --git a/src/sinks/opendal_common.rs b/src/sinks/opendal_common.rs index ba94b05e145d0..e63d570ae08d8 100644 --- a/src/sinks/opendal_common.rs +++ b/src/sinks/opendal_common.rs @@ -80,7 +80,7 @@ where input .batched_partitioned( partitioner, - Box::new(move || settings.clone().into_byte_size_config()), + Box::new(move || settings.as_byte_size_config()), ) .filter_map(|(key, batch)| async move { // We don't need to emit an error here if the event is dropped since this will occur if the template diff --git a/src/sinks/redis/sink.rs b/src/sinks/redis/sink.rs index a9dcad68682ff..128ce05adddef 100644 --- a/src/sinks/redis/sink.rs +++ b/src/sinks/redis/sink.rs @@ -86,7 +86,7 @@ impl RedisSink { let mut encoder = self.encoder.clone(); let transformer = self.transformer.clone(); - let batcher_settings = self.batcher_settings.into_byte_size_config(); + let batcher_settings = self.batcher_settings.as_byte_size_config(); input .filter_map(|event| future::ready(self.make_redis_event(event))) diff --git a/src/sinks/s3_common/sink.rs b/src/sinks/s3_common/sink.rs index 57bf735fbb687..7378854a17dd3 100644 --- a/src/sinks/s3_common/sink.rs +++ b/src/sinks/s3_common/sink.rs @@ -46,7 +46,7 @@ where input .batched_partitioned( partitioner, - Box::new(move || settings.clone().into_byte_size_config()), + Box::new(move || settings.as_byte_size_config()), ) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) .request_builder(default_request_builder_concurrency_limit(), request_builder) diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index 9ba19c5ffcad8..b4e488ac7c1da 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -78,7 +78,7 @@ where } else { EventPartitioner::new(None, None, None, None) }, - Box::new(move || batch_settings.clone().into_byte_size_config()), + Box::new(move || batch_settings.as_byte_size_config()), ) .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/splunk_hec/metrics/sink.rs b/src/sinks/splunk_hec/metrics/sink.rs index f5e07c18d73fa..dbc847ec816b9 100644 --- a/src/sinks/splunk_hec/metrics/sink.rs +++ b/src/sinks/splunk_hec/metrics/sink.rs @@ -56,7 +56,7 @@ where }) .batched_partitioned( EventPartitioner, - Box::new(move || batch_settings.clone().into_byte_size_config()), + Box::new(move || batch_settings.as_byte_size_config()), ) .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/statsd/sink.rs b/src/sinks/statsd/sink.rs index a9f0bdf44a9e2..f5c77e91b4f86 100644 --- a/src/sinks/statsd/sink.rs +++ b/src/sinks/statsd/sink.rs @@ -58,7 +58,7 @@ where // other metric types in type-specific ways i.e. incremental gauge updates use a // different syntax, etc. .normalized_with_default::() - .batched(self.batch_settings.into_item_size_config(StatsdBatchSizer)) + .batched(self.batch_settings.as_item_size_config(StatsdBatchSizer)) // We build our requests "incrementally", which means that for a single batch of // metrics, we might generate N requests to represent all of the metrics in the batch. // From e6b052d9efeb08e7e3e7bdb6de05b7981954ba2e Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 6 Oct 2023 12:43:25 +0100 Subject: [PATCH 04/10] Spelling Signed-off-by: Stephen Wakely --- lib/vector-core/src/stream/partitioned_batcher.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/vector-core/src/stream/partitioned_batcher.rs b/lib/vector-core/src/stream/partitioned_batcher.rs index 8573e86f707e0..1497ff9b53aa0 100644 --- a/lib/vector-core/src/stream/partitioned_batcher.rs +++ b/lib/vector-core/src/stream/partitioned_batcher.rs @@ -183,7 +183,7 @@ pub struct PartitionedBatcher where Prt: Partitioner, { - /// A closure that retrievs a new `BatchConfig` when needed to batch a + /// A closure that retrieves a new [`BatchConfig`] when needed to batch a /// new partition. state: Box C + Send>, /// The store of live batches. Note that the key here is an option type, From 179f35173393cd7b40e7d9b3e57af27b2753d895 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 6 Oct 2023 13:09:02 +0100 Subject: [PATCH 05/10] Don't box the closure Signed-off-by: Stephen Wakely --- .../src/stream/partitioned_batcher.rs | 22 +++++++++---------- src/sinks/aws_cloudwatch_logs/sink.rs | 7 +++--- src/sinks/aws_kinesis/sink.rs | 2 +- src/sinks/azure_common/sink.rs | 5 +---- src/sinks/clickhouse/sink.rs | 7 +++--- src/sinks/datadog/logs/sink.rs | 5 +---- src/sinks/datadog/metrics/sink.rs | 7 +++--- src/sinks/datadog/traces/sink.rs | 5 +---- src/sinks/gcs_common/sink.rs | 5 +---- src/sinks/loki/sink.rs | 5 +---- src/sinks/opendal_common.rs | 5 +---- src/sinks/s3_common/sink.rs | 5 +---- src/sinks/splunk_hec/logs/sink.rs | 2 +- src/sinks/splunk_hec/metrics/sink.rs | 5 +---- src/sinks/util/builder.rs | 7 +++--- 15 files changed, 33 insertions(+), 61 deletions(-) diff --git a/lib/vector-core/src/stream/partitioned_batcher.rs b/lib/vector-core/src/stream/partitioned_batcher.rs index 1497ff9b53aa0..68bb989ed371b 100644 --- a/lib/vector-core/src/stream/partitioned_batcher.rs +++ b/lib/vector-core/src/stream/partitioned_batcher.rs @@ -179,13 +179,13 @@ impl BatcherSettings { } #[pin_project] -pub struct PartitionedBatcher +pub struct PartitionedBatcher where Prt: Partitioner, { /// A closure that retrieves a new [`BatchConfig`] when needed to batch a /// new partition. - state: Box C + Send>, + state: F, /// The store of live batches. Note that the key here is an option type, /// on account of the interface of `Prt`. batches: HashMap>, @@ -202,15 +202,16 @@ where stream: Fuse, } -impl PartitionedBatcher, C> +impl PartitionedBatcher, C, F> where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, Prt::Item: ByteSizeOf, C: BatchConfig, + F: Fn() -> C + Send, { - pub fn new(stream: St, partitioner: Prt, settings: Box C + Send>) -> Self { + pub fn new(stream: St, partitioner: Prt, settings: F) -> Self { let timeout = settings().timeout(); Self { state: settings, @@ -224,20 +225,16 @@ where } #[cfg(test)] -impl PartitionedBatcher +impl PartitionedBatcher where St: Stream, Prt: Partitioner + Unpin, Prt::Key: Eq + Hash + Clone, Prt::Item: ByteSizeOf, C: BatchConfig, + F: Fn() -> C + Send, { - pub fn with_timer( - stream: St, - partitioner: Prt, - timer: KT, - settings: Box C + Send>, - ) -> Self { + pub fn with_timer(stream: St, partitioner: Prt, timer: KT, settings: F) -> Self { Self { state: settings, batches: HashMap::default(), @@ -249,7 +246,7 @@ where } } -impl Stream for PartitionedBatcher +impl Stream for PartitionedBatcher where St: Stream, Prt: Partitioner + Unpin, @@ -257,6 +254,7 @@ where Prt::Item: ByteSizeOf, KT: KeyedTimer, C: BatchConfig>, + F: Fn() -> C + Send, { type Item = (Prt::Key, Vec); diff --git a/src/sinks/aws_cloudwatch_logs/sink.rs b/src/sinks/aws_cloudwatch_logs/sink.rs index b23d752a58977..5d71f8d0b605e 100644 --- a/src/sinks/aws_cloudwatch_logs/sink.rs +++ b/src/sinks/aws_cloudwatch_logs/sink.rs @@ -49,10 +49,9 @@ where let age_range = start..end; future::ready(age_range.contains(&req.timestamp)) }) - .batched_partitioned( - CloudwatchPartitioner, - Box::new(move || batcher_settings.as_byte_size_config()), - ) + .batched_partitioned(CloudwatchPartitioner, || { + batcher_settings.as_byte_size_config() + }) .map(|(key, events)| { let metadata = RequestMetadata::from_batch( events.iter().map(|req| req.get_metadata().clone()), diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index 7e72234ab1743..60e056d2811ad 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -69,7 +69,7 @@ where KinesisPartitioner { _phantom: PhantomData, }, - Box::new(move || batch_settings.as_byte_size_config()), + || batch_settings.as_byte_size_config(), ) .map(|(key, events)| { let metadata = RequestMetadata::from_batch( diff --git a/src/sinks/azure_common/sink.rs b/src/sinks/azure_common/sink.rs index 4f8c8c11c7fc2..a954217a15f33 100644 --- a/src/sinks/azure_common/sink.rs +++ b/src/sinks/azure_common/sink.rs @@ -42,10 +42,7 @@ where let request_builder = self.request_builder; input - .batched_partitioned( - partitioner, - Box::new(move || settings.as_byte_size_config()), - ) + .batched_partitioned(partitioner, || settings.as_byte_size_config()) .filter_map(|(key, batch)| async move { // We don't need to emit an error here if the event is dropped since this will occur if the template // couldn't be rendered during the partitioning. A `TemplateRenderingError` is already emitted when diff --git a/src/sinks/clickhouse/sink.rs b/src/sinks/clickhouse/sink.rs index 62af596bb1f07..987bc5c1e34e3 100644 --- a/src/sinks/clickhouse/sink.rs +++ b/src/sinks/clickhouse/sink.rs @@ -45,10 +45,9 @@ impl ClickhouseSink { let batch_settings = self.batch_settings.clone(); input - .batched_partitioned( - KeyPartitioner::new(self.database, self.table), - Box::new(move || batch_settings.as_byte_size_config()), - ) + .batched_partitioned(KeyPartitioner::new(self.database, self.table), || { + batch_settings.as_byte_size_config() + }) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 98afac3f4ca1f..2abad6286bd47 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -263,10 +263,7 @@ where let partitioner = EventPartitioner; let batch_settings = self.batch_settings.clone(); - let input = input.batched_partitioned( - partitioner, - Box::new(move || batch_settings.as_byte_size_config()), - ); + let input = input.batched_partitioned(partitioner, || batch_settings.as_byte_size_config()); input .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index 7788c0a55bbfa..23ea4e2a77227 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -99,10 +99,9 @@ where .normalized_with_default::() // We batch metrics by their endpoint: series endpoint for counters, gauge, and sets vs sketch endpoint for // distributions, aggregated histograms, and sketches. - .batched_partitioned( - DatadogMetricsTypePartitioner, - Box::new(move || batch_settings.as_byte_size_config()), - ) + .batched_partitioned(DatadogMetricsTypePartitioner, || { + batch_settings.as_byte_size_config() + }) // Aggregate counters with identical timestamps, otherwise identical counters (same // series and same timestamp, when rounded to whole seconds) will be dropped in a // last-write-wins situation when they hit the DD metrics intake. diff --git a/src/sinks/datadog/traces/sink.rs b/src/sinks/datadog/traces/sink.rs index 52e7ed237c056..dd09b57a64d86 100644 --- a/src/sinks/datadog/traces/sink.rs +++ b/src/sinks/datadog/traces/sink.rs @@ -111,10 +111,7 @@ where let batch_settings = self.batch_settings.clone(); input - .batched_partitioned( - EventPartitioner, - Box::new(move || batch_settings.as_byte_size_config()), - ) + .batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config()) .incremental_request_builder(self.request_builder) .flat_map(stream::iter) .filter_map(|request| async move { diff --git a/src/sinks/gcs_common/sink.rs b/src/sinks/gcs_common/sink.rs index 4e6326fd8dc2e..1f1971a15152e 100644 --- a/src/sinks/gcs_common/sink.rs +++ b/src/sinks/gcs_common/sink.rs @@ -45,10 +45,7 @@ where let request_builder = self.request_builder; input - .batched_partitioned( - partitioner, - Box::new(move || settings.as_byte_size_config()), - ) + .batched_partitioned(partitioner, || settings.as_byte_size_config()) .filter_map(|(key, batch)| async move { // A `TemplateRenderingError` will have been emitted by `KeyPartitioner` if the key here is `None`, // thus no further `EventsDropped` event needs emitting at this stage. diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index 8563e2b45286a..1c588bcb4d78c 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -459,10 +459,7 @@ impl LokiSink { .map(|event| encoder.encode_event(event)) .filter_map(|event| async { event }) .map(|record| filter.filter_record(record)) - .batched_partitioned( - RecordPartitioner, - Box::new(move || batch_settings.as_byte_size_config()), - ) + .batched_partitioned(RecordPartitioner, || batch_settings.as_byte_size_config()) .filter_map(|(partition, batch)| async { if let Some(partition) = partition { let mut count: usize = 0; diff --git a/src/sinks/opendal_common.rs b/src/sinks/opendal_common.rs index e63d570ae08d8..702ace4227da7 100644 --- a/src/sinks/opendal_common.rs +++ b/src/sinks/opendal_common.rs @@ -78,10 +78,7 @@ where let request_builder = self.request_builder; input - .batched_partitioned( - partitioner, - Box::new(move || settings.as_byte_size_config()), - ) + .batched_partitioned(partitioner, || settings.as_byte_size_config()) .filter_map(|(key, batch)| async move { // We don't need to emit an error here if the event is dropped since this will occur if the template // couldn't be rendered during the partitioning. A `TemplateRenderingError` is already emitted when diff --git a/src/sinks/s3_common/sink.rs b/src/sinks/s3_common/sink.rs index 7378854a17dd3..6d957daa4785b 100644 --- a/src/sinks/s3_common/sink.rs +++ b/src/sinks/s3_common/sink.rs @@ -44,10 +44,7 @@ where let request_builder = self.request_builder; input - .batched_partitioned( - partitioner, - Box::new(move || settings.as_byte_size_config()), - ) + .batched_partitioned(partitioner, || settings.as_byte_size_config()) .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) }) .request_builder(default_request_builder_concurrency_limit(), request_builder) .filter_map(|request| async move { diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index b4e488ac7c1da..a9a3b1d23e76c 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -78,7 +78,7 @@ where } else { EventPartitioner::new(None, None, None, None) }, - Box::new(move || batch_settings.as_byte_size_config()), + || batch_settings.as_byte_size_config(), ) .request_builder( default_request_builder_concurrency_limit(), diff --git a/src/sinks/splunk_hec/metrics/sink.rs b/src/sinks/splunk_hec/metrics/sink.rs index dbc847ec816b9..d8a4e23b57d77 100644 --- a/src/sinks/splunk_hec/metrics/sink.rs +++ b/src/sinks/splunk_hec/metrics/sink.rs @@ -54,10 +54,7 @@ where default_namespace, )) }) - .batched_partitioned( - EventPartitioner, - Box::new(move || batch_settings.as_byte_size_config()), - ) + .batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), self.request_builder, diff --git a/src/sinks/util/builder.rs b/src/sinks/util/builder.rs index 30025bbdf9bf1..a25f0badb6f70 100644 --- a/src/sinks/util/builder.rs +++ b/src/sinks/util/builder.rs @@ -44,17 +44,18 @@ pub trait SinkBuilderExt: Stream { /// The stream will yield batches of events, with their partition key, when either a batch fills /// up or times out. [`Partitioner`] operates on a per-event basis, and has access to the event /// itself, and so can access any and all fields of an event. - fn batched_partitioned( + fn batched_partitioned( self, partitioner: P, - settings: Box C + Send>, - ) -> PartitionedBatcher, C> + settings: F, + ) -> PartitionedBatcher, C, F> where Self: Stream + Sized, P: Partitioner + Unpin, P::Key: Eq + Hash + Clone, P::Item: ByteSizeOf, C: BatchConfig, + F: Fn() -> C + Send, { PartitionedBatcher::new(self, partitioner, settings) } From f8967b7bae3dc68dc41fe18ec49c3edf100183cc Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 6 Oct 2023 13:26:16 +0100 Subject: [PATCH 06/10] Clippy Signed-off-by: Stephen Wakely --- .../src/stream/partitioned_batcher.rs | 20 ++++++++----------- src/sinks/aws_kinesis/sink.rs | 2 +- src/sinks/azure_common/sink.rs | 2 +- src/sinks/clickhouse/sink.rs | 2 +- src/sinks/datadog/logs/sink.rs | 2 +- src/sinks/datadog/metrics/sink.rs | 2 +- src/sinks/datadog/traces/sink.rs | 2 +- src/sinks/loki/sink.rs | 2 +- src/sinks/opendal_common.rs | 2 +- src/sinks/s3_common/sink.rs | 3 +-- src/sinks/splunk_hec/logs/sink.rs | 2 +- src/sinks/splunk_hec/metrics/sink.rs | 2 +- 12 files changed, 19 insertions(+), 24 deletions(-) diff --git a/lib/vector-core/src/stream/partitioned_batcher.rs b/lib/vector-core/src/stream/partitioned_batcher.rs index 68bb989ed371b..e3899a691d31d 100644 --- a/lib/vector-core/src/stream/partitioned_batcher.rs +++ b/lib/vector-core/src/stream/partitioned_batcher.rs @@ -302,15 +302,14 @@ where let item_key = this.partitioner.partition(&item); // Get the batch for this partition, or create a new one. - let batch = match this.batches.get_mut(&item_key) { - Some(batch) => batch, - None => { - let batch = (this.state)(); - this.batches.insert(item_key.clone(), batch); - this.batches - .get_mut(&item_key) - .expect("batch has just been inserted so should exist") - } + let batch = if let Some(batch) = this.batches.get_mut(&item_key) { + batch + } else { + let batch = (this.state)(); + this.batches.insert(item_key.clone(), batch); + this.batches + .get_mut(&item_key) + .expect("batch has just been inserted so should exist") }; let (fits, metadata) = batch.item_fits_in_batch(&item); @@ -579,9 +578,6 @@ mod test { .get_mut(&key) .expect("impossible situation"); - dbg!(&actual_batch); - dbg!(&expected_partition); - for item in actual_batch { assert_eq!(item, expected_partition.pop().unwrap()); } diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index 60e056d2811ad..6dfb0be08d19e 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -42,7 +42,7 @@ where R: Record + Send + Sync + Unpin + Clone + 'static, { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let batch_settings = self.batch_settings.clone(); + let batch_settings = self.batch_settings; input .filter_map(|event| { diff --git a/src/sinks/azure_common/sink.rs b/src/sinks/azure_common/sink.rs index a954217a15f33..4844474232cd0 100644 --- a/src/sinks/azure_common/sink.rs +++ b/src/sinks/azure_common/sink.rs @@ -37,7 +37,7 @@ where { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let partitioner = self.partitioner; - let settings = self.batcher_settings.clone(); + let settings = self.batcher_settings; let request_builder = self.request_builder; diff --git a/src/sinks/clickhouse/sink.rs b/src/sinks/clickhouse/sink.rs index 987bc5c1e34e3..be9c28a0e83b4 100644 --- a/src/sinks/clickhouse/sink.rs +++ b/src/sinks/clickhouse/sink.rs @@ -42,7 +42,7 @@ impl ClickhouseSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let batch_settings = self.batch_settings.clone(); + let batch_settings = self.batch_settings; input .batched_partitioned(KeyPartitioner::new(self.database, self.table), || { diff --git a/src/sinks/datadog/logs/sink.rs b/src/sinks/datadog/logs/sink.rs index 2abad6286bd47..7b1245edb6389 100644 --- a/src/sinks/datadog/logs/sink.rs +++ b/src/sinks/datadog/logs/sink.rs @@ -261,7 +261,7 @@ where let default_api_key = Arc::clone(&self.default_api_key); let partitioner = EventPartitioner; - let batch_settings = self.batch_settings.clone(); + let batch_settings = self.batch_settings; let input = input.batched_partitioned(partitioner, || batch_settings.as_byte_size_config()); input diff --git a/src/sinks/datadog/metrics/sink.rs b/src/sinks/datadog/metrics/sink.rs index 23ea4e2a77227..0b50549acefe0 100644 --- a/src/sinks/datadog/metrics/sink.rs +++ b/src/sinks/datadog/metrics/sink.rs @@ -84,7 +84,7 @@ where async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let mut splitter: MetricSplitter = MetricSplitter::default(); - let batch_settings = self.batch_settings.clone(); + let batch_settings = self.batch_settings; input // Convert `Event` to `Metric` so we don't have to deal with constant conversions. diff --git a/src/sinks/datadog/traces/sink.rs b/src/sinks/datadog/traces/sink.rs index dd09b57a64d86..384d7ef635022 100644 --- a/src/sinks/datadog/traces/sink.rs +++ b/src/sinks/datadog/traces/sink.rs @@ -108,7 +108,7 @@ where } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let batch_settings = self.batch_settings.clone(); + let batch_settings = self.batch_settings; input .batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config()) diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index 1c588bcb4d78c..1b0d45bb3092b 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -453,7 +453,7 @@ impl LokiSink { NonZeroUsize::new(1).expect("static") } }; - let batch_settings = self.batch_settings.clone(); + let batch_settings = self.batch_settings; input .map(|event| encoder.encode_event(event)) diff --git a/src/sinks/opendal_common.rs b/src/sinks/opendal_common.rs index 702ace4227da7..34b4962771835 100644 --- a/src/sinks/opendal_common.rs +++ b/src/sinks/opendal_common.rs @@ -73,7 +73,7 @@ where { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let partitioner = self.partitioner; - let settings = self.batcher_settings.clone(); + let settings = self.batcher_settings; let request_builder = self.request_builder; diff --git a/src/sinks/s3_common/sink.rs b/src/sinks/s3_common/sink.rs index 6d957daa4785b..fd6733a484a75 100644 --- a/src/sinks/s3_common/sink.rs +++ b/src/sinks/s3_common/sink.rs @@ -39,8 +39,7 @@ where { async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let partitioner = self.partitioner; - let settings = self.batcher_settings.clone(); - + let settings = self.batcher_settings; let request_builder = self.request_builder; input diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index a9a3b1d23e76c..7fed3bf3310e0 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -61,7 +61,7 @@ where timestamp_key: self.timestamp_key.clone(), endpoint_target: self.endpoint_target, }; - let batch_settings = self.batch_settings.clone(); + let batch_settings = self.batch_settings; input .map(move |event| process_log(event, &data)) diff --git a/src/sinks/splunk_hec/metrics/sink.rs b/src/sinks/splunk_hec/metrics/sink.rs index d8a4e23b57d77..6f6ad5d74b9dd 100644 --- a/src/sinks/splunk_hec/metrics/sink.rs +++ b/src/sinks/splunk_hec/metrics/sink.rs @@ -39,7 +39,7 @@ where let index = self.index.as_ref(); let host_key = self.host_key.as_ref(); let default_namespace = self.default_namespace.as_deref(); - let batch_settings = self.batch_settings.clone(); + let batch_settings = self.batch_settings; input .map(|event| (event.size_of(), event.into_metric())) From 896d76faef19c38553e4bbaae75424da34f1415d Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 6 Oct 2023 16:05:26 +0100 Subject: [PATCH 07/10] Only insert timeout if we add the batch to the list Signed-off-by: Stephen Wakely --- lib/vector-core/src/stream/partitioned_batcher.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/vector-core/src/stream/partitioned_batcher.rs b/lib/vector-core/src/stream/partitioned_batcher.rs index e3899a691d31d..ab82a34bda404 100644 --- a/lib/vector-core/src/stream/partitioned_batcher.rs +++ b/lib/vector-core/src/stream/partitioned_batcher.rs @@ -323,13 +323,14 @@ where // Insert the item into the batch. batch.push(item, metadata); - this.timer.insert(item_key.clone()); if batch.is_batch_full() { // If the insertion means the batch is now full, we clear out the batch and // remove it from the list. this.closed_batches .push((item_key.clone(), batch.take_batch())); this.batches.remove(&item_key); + } else { + this.timer.insert(item_key.clone()); } } } From 817bc4623156f03453bf8f48e5109de96955d6a9 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Fri, 6 Oct 2023 16:28:50 +0100 Subject: [PATCH 08/10] Allow the timer to remove an item Signed-off-by: Stephen Wakely --- lib/vector-core/src/stream/partitioned_batcher.rs | 11 ++++++++++- lib/vector-core/src/time.rs | 3 +++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/vector-core/src/stream/partitioned_batcher.rs b/lib/vector-core/src/stream/partitioned_batcher.rs index ab82a34bda404..e6afbf4ab79fc 100644 --- a/lib/vector-core/src/stream/partitioned_batcher.rs +++ b/lib/vector-core/src/stream/partitioned_batcher.rs @@ -86,6 +86,10 @@ where } } + fn remove(&mut self, item_key: &K) { + self.expiration_map.remove(item_key); + } + fn poll_expired(&mut self, cx: &mut Context) -> Poll> { match ready!(self.expirations.poll_expired(cx)) { // No expirations yet. @@ -329,8 +333,9 @@ where this.closed_batches .push((item_key.clone(), batch.take_batch())); this.batches.remove(&item_key); + this.timer.remove(&item_key); } else { - this.timer.insert(item_key.clone()); + this.timer.insert(item_key); } } } @@ -392,6 +397,10 @@ mod test { self.valid_keys.insert(item_key); } + fn remove(&mut self, item_key: &u8) { + self.valid_keys.remove(item_key); + } + fn poll_expired(&mut self, _cx: &mut Context) -> Poll> { match self.responses.pop() { Some(Poll::Pending) => unreachable!(), diff --git a/lib/vector-core/src/time.rs b/lib/vector-core/src/time.rs index 49481842cfcdd..e4e4e4975ebe4 100644 --- a/lib/vector-core/src/time.rs +++ b/lib/vector-core/src/time.rs @@ -21,6 +21,9 @@ pub trait KeyedTimer { /// If the given key already exists in the timer, the underlying subtimer is reset. fn insert(&mut self, item_key: K); + /// Removes a subtimer from the list. + fn remove(&mut self, item_key: &K); + /// Attempts to pull out the next expired subtimer in the queue. /// /// The key of the subtimer is returned if it has expired, otherwise, returns `None` if the From ba0e8d68fa4e28e0f7dd2c0d80ba3ee954eed21c Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Thu, 12 Oct 2023 11:13:55 +0100 Subject: [PATCH 09/10] Update stackdriver metrics batch settings config Signed-off-by: Stephen Wakely --- src/sinks/gcp/stackdriver/metrics/sink.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/gcp/stackdriver/metrics/sink.rs b/src/sinks/gcp/stackdriver/metrics/sink.rs index 538a666bdd91d..91ba22a640824 100644 --- a/src/sinks/gcp/stackdriver/metrics/sink.rs +++ b/src/sinks/gcp/stackdriver/metrics/sink.rs @@ -66,7 +66,7 @@ where }) }) .normalized_with_default::() - .batched(self.batch_settings.into_byte_size_config()) + .batched(self.batch_settings.as_byte_size_config()) .request_builder( default_request_builder_concurrency_limit(), self.request_builder, From 9a3a00a3dc391ae4ae59509070408eebe883b19c Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Mon, 16 Oct 2023 12:32:45 +0100 Subject: [PATCH 10/10] Dont panic when timer references a batch that no longer exists Signed-off-by: Stephen Wakely --- .../src/stream/partitioned_batcher.rs | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/lib/vector-core/src/stream/partitioned_batcher.rs b/lib/vector-core/src/stream/partitioned_batcher.rs index e6afbf4ab79fc..f7d0d55b9556f 100644 --- a/lib/vector-core/src/stream/partitioned_batcher.rs +++ b/lib/vector-core/src/stream/partitioned_batcher.rs @@ -87,7 +87,9 @@ where } fn remove(&mut self, item_key: &K) { - self.expiration_map.remove(item_key); + if let Some(expiration_key) = self.expiration_map.remove(item_key) { + self.expirations.remove(&expiration_key); + } } fn poll_expired(&mut self, cx: &mut Context) -> Poll> { @@ -273,18 +275,20 @@ where return Poll::Ready(this.closed_batches.pop()); } match this.stream.as_mut().poll_next(cx) { - Poll::Pending => match this.timer.poll_expired(cx) { - // Unlike normal streams, `DelayQueue` can return `None` - // here but still be usable later if more entries are added. - Poll::Pending | Poll::Ready(None) => return Poll::Pending, - Poll::Ready(Some(item_key)) => { - let mut batch = this - .batches - .remove(&item_key) - .expect("batch should exist if it is set to expire"); - this.closed_batches.push((item_key, batch.take_batch())); + Poll::Pending => { + match this.timer.poll_expired(cx) { + // Unlike normal streams, `DelayQueue` can return `None` + // here but still be usable later if more entries are added. + Poll::Pending | Poll::Ready(None) => return Poll::Pending, + Poll::Ready(Some(item_key)) => { + let mut batch = this + .batches + .remove(&item_key) + .expect("batch should exist if it is set to expire"); + this.closed_batches.push((item_key, batch.take_batch())); + } } - }, + } Poll::Ready(None) => { // Now that the underlying stream is closed, we need to // clear out our batches, including all expiration @@ -311,6 +315,7 @@ where } else { let batch = (this.state)(); this.batches.insert(item_key.clone(), batch); + this.timer.insert(item_key.clone()); this.batches .get_mut(&item_key) .expect("batch has just been inserted so should exist") @@ -323,6 +328,11 @@ where // next iteration. this.closed_batches .push((item_key.clone(), batch.take_batch())); + + // The batch for this partition key was set to + // expire, but now it's overflowed and must be + // pushed out, so now we reset the batch timeout. + this.timer.insert(item_key.clone()); } // Insert the item into the batch. @@ -334,8 +344,6 @@ where .push((item_key.clone(), batch.take_batch())); this.batches.remove(&item_key); this.timer.remove(&item_key); - } else { - this.timer.insert(item_key); } } }