diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index 10e92916d3d62..97c4bfdedd83b 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -5,7 +5,9 @@ use std::{collections::HashMap, fmt}; use chrono::Utc; use futures::{Stream, StreamExt}; use metrics::{register_histogram, Histogram}; +use tracing::Span; use vector_buffers::topology::channel::{self, LimitedReceiver, LimitedSender}; +use vector_common::internal_event::{ComponentEventsDropped, UNINTENTIONAL}; #[cfg(test)] use vector_core::event::{into_event_stream, EventStatus}; use vector_core::{ @@ -206,6 +208,9 @@ impl SourceSender { recv } + /// Send an event to the default output. + /// + /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events. pub async fn send_event(&mut self, event: impl Into) -> Result<(), ClosedError> { self.inner .as_mut() @@ -214,6 +219,9 @@ impl SourceSender { .await } + /// Send a stream of events to the default output. + /// + /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events. pub async fn send_event_stream(&mut self, events: S) -> Result<(), ClosedError> where S: Stream + Unpin, @@ -226,10 +234,14 @@ impl SourceSender { .await } + /// Send a batch of events to the default output. + /// + /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events. pub async fn send_batch(&mut self, events: I) -> Result<(), ClosedError> where E: Into + ByteSizeOf, I: IntoIterator, + ::IntoIter: ExactSizeIterator, { self.inner .as_mut() @@ -238,10 +250,14 @@ impl SourceSender { .await } + /// Send a batch of events event to a named output. + /// + /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events. pub async fn send_batch_named(&mut self, name: &str, events: I) -> Result<(), ClosedError> where E: Into + ByteSizeOf, I: IntoIterator, + ::IntoIter: ExactSizeIterator, { self.named_inners .get_mut(name) @@ -251,6 +267,47 @@ impl SourceSender { } } +/// UnsentEvents tracks the number of events yet to be sent in the buffer. This is used to +/// increment the appropriate counters when a future is not polled to completion. Particularly, +/// this is known to happen in a Warp server when a client sends a new HTTP request on a TCP +/// connection that already has a pending request. +/// +/// If its internal count is greater than 0 when dropped, the appropriate [ComponentEventsDropped] +/// event is emitted. +struct UnsentEventCount { + count: usize, + span: Span, +} + +impl UnsentEventCount { + fn new(count: usize) -> Self { + Self { + count, + span: Span::current(), + } + } + + fn decr(&mut self, count: usize) { + self.count = self.count.saturating_sub(count); + } + + fn discard(&mut self) { + self.count = 0; + } +} + +impl Drop for UnsentEventCount { + fn drop(&mut self) { + if self.count > 0 { + let _enter = self.span.enter(); + emit!(ComponentEventsDropped:: { + count: self.count, + reason: "Source send cancelled." + }); + } + } +} + #[derive(Clone)] struct Inner { inner: LimitedSender, @@ -322,7 +379,15 @@ impl Inner { } async fn send_event(&mut self, event: impl Into) -> Result<(), ClosedError> { - self.send(event.into()).await + let event: EventArray = event.into(); + // It's possible that the caller stops polling this future while it is blocked waiting + // on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit + // `ComponentEventsDropped` events. + let count = event.len(); + let mut unsent_event_count = UnsentEventCount::new(count); + let res = self.send(event).await; + unsent_event_count.discard(); + res } async fn send_event_stream(&mut self, events: S) -> Result<(), ClosedError> @@ -341,10 +406,22 @@ impl Inner { where E: Into + ByteSizeOf, I: IntoIterator, + ::IntoIter: ExactSizeIterator, { + // It's possible that the caller stops polling this future while it is blocked waiting + // on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit + // `ComponentEventsDropped` events. let events = events.into_iter().map(Into::into); + let mut unsent_event_count = UnsentEventCount::new(events.len()); for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) { - self.send(events).await?; + let count = events.len(); + self.send(events).await.map_err(|err| { + // The unsent event count is discarded here because the caller emits the + // `StreamClosedError`. + unsent_event_count.discard(); + err + })?; + unsent_event_count.decr(count); } Ok(()) } @@ -394,6 +471,7 @@ fn get_timestamp_millis(value: &Value) -> Option { mod tests { use chrono::{DateTime, Duration}; use rand::{thread_rng, Rng}; + use tokio::time::timeout; use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent}; use vrl::event_path; @@ -483,4 +561,83 @@ mod tests { _ => panic!("source_lag_time_seconds has invalid type"), } } + + #[tokio::test] + async fn emits_component_discarded_events_total_for_send_event() { + metrics::init_test(); + let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1); + + let event = Event::Metric(Metric::new( + "name", + MetricKind::Absolute, + MetricValue::Gauge { value: 123.4 }, + )); + + // First send will succeed. + sender + .send_event(event.clone()) + .await + .expect("First send should not fail"); + + // Second send will timeout, so the future will not be polled to completion. + let res = timeout( + std::time::Duration::from_millis(100), + sender.send_event(event.clone()), + ) + .await; + assert!(res.is_err(), "Send should have timed out."); + + let component_discarded_events_total = Controller::get() + .expect("There must be a controller") + .capture_metrics() + .into_iter() + .filter(|metric| metric.name() == "component_discarded_events_total") + .collect::>(); + assert_eq!(component_discarded_events_total.len(), 1); + + let component_discarded_events_total = &component_discarded_events_total[0]; + let MetricValue::Counter { value } = component_discarded_events_total.value() else { + panic!("component_discarded_events_total has invalid type") + }; + assert_eq!(*value, 1.0); + } + + #[tokio::test] + async fn emits_component_discarded_events_total_for_send_batch() { + metrics::init_test(); + let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1); + + let expected_drop = 100; + let events: Vec = (0..(CHUNK_SIZE + expected_drop)) + .map(|_| { + Event::Metric(Metric::new( + "name", + MetricKind::Absolute, + MetricValue::Gauge { value: 123.4 }, + )) + }) + .collect(); + + // `CHUNK_SIZE` events will be sent into buffer but then the future will not be polled to completion. + let res = timeout( + std::time::Duration::from_millis(100), + sender.send_batch(events), + ) + .await; + assert!(res.is_err(), "Send should have timed out."); + + let component_discarded_events_total = Controller::get() + .expect("There must be a controller") + .capture_metrics() + .into_iter() + .filter(|metric| metric.name() == "component_discarded_events_total") + .collect::>(); + assert_eq!(component_discarded_events_total.len(), 1); + + let component_discarded_events_total = &component_discarded_events_total[0]; + let MetricValue::Counter { value } = component_discarded_events_total.value() else { + panic!("component_discarded_events_total has invalid type") + }; + assert_eq!(*value, expected_drop as f64); + } } diff --git a/src/sources/mongodb_metrics/mod.rs b/src/sources/mongodb_metrics/mod.rs index 16dc2a79a23d3..6c0798ef2278f 100644 --- a/src/sources/mongodb_metrics/mod.rs +++ b/src/sources/mongodb_metrics/mod.rs @@ -139,13 +139,13 @@ impl SourceConfig for MongoDbMetricsConfig { while interval.next().await.is_some() { let start = Instant::now(); let metrics = join_all(sources.iter().map(|mongodb| mongodb.collect())).await; - let count = metrics.len(); emit!(CollectionCompleted { start, end: Instant::now() }); - let metrics = metrics.into_iter().flatten(); + let metrics: Vec = metrics.into_iter().flatten().collect(); + let count = metrics.len(); if (cx.out.send_batch(metrics).await).is_err() { emit!(StreamClosedError { count }); diff --git a/src/sources/nginx_metrics/mod.rs b/src/sources/nginx_metrics/mod.rs index 6a849ddfac51d..b58c3e7d5416f 100644 --- a/src/sources/nginx_metrics/mod.rs +++ b/src/sources/nginx_metrics/mod.rs @@ -127,13 +127,13 @@ impl SourceConfig for NginxMetricsConfig { while interval.next().await.is_some() { let start = Instant::now(); let metrics = join_all(sources.iter().map(|nginx| nginx.collect())).await; - let count = metrics.len(); emit!(CollectionCompleted { start, end: Instant::now() }); - let metrics = metrics.into_iter().flatten(); + let metrics: Vec = metrics.into_iter().flatten().collect(); + let count = metrics.len(); if (cx.out.send_batch(metrics).await).is_err() { emit!(StreamClosedError { count }); diff --git a/src/sources/postgresql_metrics.rs b/src/sources/postgresql_metrics.rs index bca7a56c13317..25fefe02d7d57 100644 --- a/src/sources/postgresql_metrics.rs +++ b/src/sources/postgresql_metrics.rs @@ -220,13 +220,14 @@ impl SourceConfig for PostgresqlMetricsConfig { while interval.next().await.is_some() { let start = Instant::now(); let metrics = join_all(sources.iter_mut().map(|source| source.collect())).await; - let count = metrics.len(); emit!(CollectionCompleted { start, end: Instant::now() }); - let metrics = metrics.into_iter().flatten(); + let metrics: Vec = metrics.into_iter().flatten().collect(); + let count = metrics.len(); + if (cx.out.send_batch(metrics).await).is_err() { emit!(StreamClosedError { count }); return Err(()); diff --git a/src/topology/test/mod.rs b/src/topology/test/mod.rs index 26283229384b1..5519c173b3345 100644 --- a/src/topology/test/mod.rs +++ b/src/topology/test/mod.rs @@ -629,7 +629,9 @@ async fn topology_swap_transform_is_atomic() { } }; let input = async move { - in1.send_batch(iter::from_fn(events)).await.unwrap(); + in1.send_event_stream(stream::iter(iter::from_fn(events))) + .await + .unwrap(); }; let output = out1.for_each(move |_| { recv_counter.fetch_add(1, Ordering::Release);