Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 159 additions & 2 deletions src/source_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::{collections::HashMap, fmt};
use chrono::Utc;
use futures::{Stream, StreamExt};
use metrics::{register_histogram, Histogram};
use tracing::Span;
use vector_buffers::topology::channel::{self, LimitedReceiver, LimitedSender};
use vector_common::internal_event::{ComponentEventsDropped, UNINTENTIONAL};
#[cfg(test)]
use vector_core::event::{into_event_stream, EventStatus};
use vector_core::{
Expand Down Expand Up @@ -206,6 +208,9 @@ impl SourceSender {
recv
}

/// Send an event to the default output.
///
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
self.inner
.as_mut()
Expand All @@ -214,6 +219,9 @@ impl SourceSender {
.await
}

/// Send a stream of events to the default output.
///
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
where
S: Stream<Item = E> + Unpin,
Expand All @@ -226,10 +234,14 @@ impl SourceSender {
.await
}

/// Send a batch of events to the default output.
///
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
where
E: Into<Event> + ByteSizeOf,
I: IntoIterator<Item = E>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
self.inner
.as_mut()
Expand All @@ -238,10 +250,14 @@ impl SourceSender {
.await
}

/// Send a batch of events event to a named output.
///
/// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), ClosedError>
where
E: Into<Event> + ByteSizeOf,
I: IntoIterator<Item = E>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
self.named_inners
.get_mut(name)
Expand All @@ -251,6 +267,47 @@ impl SourceSender {
}
}

/// UnsentEvents tracks the number of events yet to be sent in the buffer. This is used to
/// increment the appropriate counters when a future is not polled to completion. Particularly,
/// this is known to happen in a Warp server when a client sends a new HTTP request on a TCP
/// connection that already has a pending request.
///
/// If its internal count is greater than 0 when dropped, the appropriate [ComponentEventsDropped]
/// event is emitted.
struct UnsentEventCount {
count: usize,
span: Span,
}

impl UnsentEventCount {
fn new(count: usize) -> Self {
Self {
count,
span: Span::current(),
}
}

fn decr(&mut self, count: usize) {
self.count = self.count.saturating_sub(count);
}

fn discard(&mut self) {
self.count = 0;
}
}

impl Drop for UnsentEventCount {
fn drop(&mut self) {
if self.count > 0 {
let _enter = self.span.enter();
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: self.count,
reason: "Source send cancelled."
});
}
}
}

#[derive(Clone)]
struct Inner {
inner: LimitedSender<EventArray>,
Expand Down Expand Up @@ -322,7 +379,15 @@ impl Inner {
}

async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
self.send(event.into()).await
let event: EventArray = event.into();
// It's possible that the caller stops polling this future while it is blocked waiting
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
// `ComponentEventsDropped` events.
let count = event.len();
let mut unsent_event_count = UnsentEventCount::new(count);
let res = self.send(event).await;
unsent_event_count.discard();
res
Comment on lines +387 to +390
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, on the surface, looks like UnsentEventCount::drop would never happen with count > 0. Is this meant to handle where the task is dropped without sending the events? If so, it might be worth a comment to that effect.

Copy link
Contributor Author

@dsmith3197 dsmith3197 Oct 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. This is to account for the scenarios where the task is dropped on line 385. I can add a comment.

}

async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
Expand All @@ -341,10 +406,22 @@ impl Inner {
where
E: Into<Event> + ByteSizeOf,
I: IntoIterator<Item = E>,
<I as IntoIterator>::IntoIter: ExactSizeIterator,
{
// It's possible that the caller stops polling this future while it is blocked waiting
// on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
// `ComponentEventsDropped` events.
let events = events.into_iter().map(Into::into);
let mut unsent_event_count = UnsentEventCount::new(events.len());
for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
self.send(events).await?;
let count = events.len();
self.send(events).await.map_err(|err| {
// The unsent event count is discarded here because the caller emits the
// `StreamClosedError`.
unsent_event_count.discard();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear why this discard is here, setting the count to zero. If the self.send returns with an error, aren't the events still unsent and should be reported as such?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discard is here because the caller handles emitting a StreamClosed error, which increments component_events_discarded_total. I have a follow-up branch to consolidate that logic. For now, I'll add a comment.

err
})?;
unsent_event_count.decr(count);
}
Ok(())
}
Expand Down Expand Up @@ -394,6 +471,7 @@ fn get_timestamp_millis(value: &Value) -> Option<i64> {
mod tests {
use chrono::{DateTime, Duration};
use rand::{thread_rng, Rng};
use tokio::time::timeout;
use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent};
use vrl::event_path;

Expand Down Expand Up @@ -483,4 +561,83 @@ mod tests {
_ => panic!("source_lag_time_seconds has invalid type"),
}
}

#[tokio::test]
async fn emits_component_discarded_events_total_for_send_event() {
metrics::init_test();
let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);

let event = Event::Metric(Metric::new(
"name",
MetricKind::Absolute,
MetricValue::Gauge { value: 123.4 },
));

// First send will succeed.
sender
.send_event(event.clone())
.await
.expect("First send should not fail");

// Second send will timeout, so the future will not be polled to completion.
let res = timeout(
std::time::Duration::from_millis(100),
sender.send_event(event.clone()),
)
.await;
assert!(res.is_err(), "Send should have timed out.");

let component_discarded_events_total = Controller::get()
.expect("There must be a controller")
.capture_metrics()
.into_iter()
.filter(|metric| metric.name() == "component_discarded_events_total")
.collect::<Vec<_>>();
assert_eq!(component_discarded_events_total.len(), 1);

let component_discarded_events_total = &component_discarded_events_total[0];
let MetricValue::Counter { value } = component_discarded_events_total.value() else {
panic!("component_discarded_events_total has invalid type")
};
assert_eq!(*value, 1.0);
}

#[tokio::test]
async fn emits_component_discarded_events_total_for_send_batch() {
metrics::init_test();
let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);

let expected_drop = 100;
let events: Vec<Event> = (0..(CHUNK_SIZE + expected_drop))
.map(|_| {
Event::Metric(Metric::new(
"name",
MetricKind::Absolute,
MetricValue::Gauge { value: 123.4 },
))
})
.collect();

// `CHUNK_SIZE` events will be sent into buffer but then the future will not be polled to completion.
let res = timeout(
std::time::Duration::from_millis(100),
sender.send_batch(events),
)
.await;
assert!(res.is_err(), "Send should have timed out.");

let component_discarded_events_total = Controller::get()
.expect("There must be a controller")
.capture_metrics()
.into_iter()
.filter(|metric| metric.name() == "component_discarded_events_total")
.collect::<Vec<_>>();
assert_eq!(component_discarded_events_total.len(), 1);

let component_discarded_events_total = &component_discarded_events_total[0];
let MetricValue::Counter { value } = component_discarded_events_total.value() else {
panic!("component_discarded_events_total has invalid type")
};
assert_eq!(*value, expected_drop as f64);
}
}
4 changes: 2 additions & 2 deletions src/sources/mongodb_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ impl SourceConfig for MongoDbMetricsConfig {
while interval.next().await.is_some() {
let start = Instant::now();
let metrics = join_all(sources.iter().map(|mongodb| mongodb.collect())).await;
let count = metrics.len();
emit!(CollectionCompleted {
start,
end: Instant::now()
});

let metrics = metrics.into_iter().flatten();
let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
let count = metrics.len();

if (cx.out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
Expand Down
4 changes: 2 additions & 2 deletions src/sources/nginx_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ impl SourceConfig for NginxMetricsConfig {
while interval.next().await.is_some() {
let start = Instant::now();
let metrics = join_all(sources.iter().map(|nginx| nginx.collect())).await;
let count = metrics.len();
emit!(CollectionCompleted {
start,
end: Instant::now()
});

let metrics = metrics.into_iter().flatten();
let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
let count = metrics.len();

if (cx.out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
Expand Down
5 changes: 3 additions & 2 deletions src/sources/postgresql_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,14 @@ impl SourceConfig for PostgresqlMetricsConfig {
while interval.next().await.is_some() {
let start = Instant::now();
let metrics = join_all(sources.iter_mut().map(|source| source.collect())).await;
let count = metrics.len();
emit!(CollectionCompleted {
start,
end: Instant::now()
});

let metrics = metrics.into_iter().flatten();
let metrics: Vec<Metric> = metrics.into_iter().flatten().collect();
let count = metrics.len();

if (cx.out.send_batch(metrics).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
Expand Down
4 changes: 3 additions & 1 deletion src/topology/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,9 @@ async fn topology_swap_transform_is_atomic() {
}
};
let input = async move {
in1.send_batch(iter::from_fn(events)).await.unwrap();
in1.send_event_stream(stream::iter(iter::from_fn(events)))
.await
.unwrap();
};
let output = out1.for_each(move |_| {
recv_counter.fetch_add(1, Ordering::Release);
Expand Down