Skip to content
5 changes: 5 additions & 0 deletions lib/vector-common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ impl PartialOrd for ComponentKey {

impl ConfigurableString for ComponentKey {}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct SourceDetails {
pub key: ComponentKey,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
19 changes: 11 additions & 8 deletions lib/vector-common/src/internal_event/events_sent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,28 @@ pub struct EventsSent<'a> {
pub count: usize,
pub byte_size: usize,
pub output: Option<&'a str>,
pub source: Option<&'a str>,
}

impl<'a> InternalEvent for EventsSent<'a> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this source has changed recently and no longer contains this implementation. You'll need to update against current master.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Yeah, I did find that out during a rebase this morning. I've got some time scheduled tomorrow to rectify this.

fn emit(self) {
let source = self.source.unwrap_or("UNKNOWN");

if let Some(output) = self.output {
trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size, output = %output);
trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size, source = %source, output = %output);
} else {
trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size);
trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size, source = %source);
}

if self.count > 0 {
if let Some(output) = self.output {
counter!("component_sent_events_total", self.count as u64, "output" => output.to_owned());
counter!("events_out_total", self.count as u64, "output" => output.to_owned());
counter!("component_sent_event_bytes_total", self.byte_size as u64, "output" => output.to_owned());
counter!("component_sent_events_total", self.count as u64, "source" => source.to_owned(), "output" => output.to_owned());
counter!("events_out_total", self.count as u64, "source" => source.to_owned(), "output" => output.to_owned());
counter!("component_sent_event_bytes_total", self.byte_size as u64, "source" => source.to_owned(), "output" => output.to_owned());
} else {
counter!("component_sent_events_total", self.count as u64);
counter!("events_out_total", self.count as u64);
counter!("component_sent_event_bytes_total", self.byte_size as u64);
counter!("component_sent_events_total", self.count as u64, "source" => source.to_owned());
counter!("events_out_total", self.count as u64, "source" => source.to_owned());
counter!("component_sent_event_bytes_total", self.byte_size as u64, "source" => source.to_owned());
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions lib/vector-core/src/event/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ pub struct EventMetadata {
/// TODO(Jean): must not skip serialization to track schemas across restarts.
#[serde(default = "default_schema_definition", skip)]
schema_definition: Arc<schema::Definition>,

/// A unique identifier of the originating source of this event.
///
/// Can be used internally to refer back to source details such as its component key.
///
/// If `None`, then the event has no originating source (e.g. it was created internally, such
/// as in the Lua or Remap transforms).
source_id: Option<usize>,
}

fn default_metadata_value() -> Value {
Expand Down Expand Up @@ -98,6 +106,7 @@ impl Default for EventMetadata {
secrets: Secrets::new(),
finalizers: Default::default(),
schema_definition: default_schema_definition(),
source_id: None,
}
}
}
Expand Down Expand Up @@ -202,6 +211,16 @@ impl EventMetadata {
pub fn set_schema_definition(&mut self, definition: &Arc<schema::Definition>) {
self.schema_definition = Arc::clone(definition);
}

/// set the source ID.
pub fn set_source_id(&mut self, source_id: usize) {
self.source_id = Some(source_id);
}

/// Get the source ID.
pub fn source_id(&self) -> Option<usize> {
self.source_id
}
}

impl EventDataEq for EventMetadata {
Expand Down
30 changes: 30 additions & 0 deletions lib/vector-core/src/event/ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

use vector_common::EventDataEq;

use crate::EstimatedJsonEncodedSizeOf;

use super::{Event, EventMetadata, LogEvent, Metric, TraceEvent};

/// A wrapper for references to inner event types, where reconstituting
Expand Down Expand Up @@ -64,6 +66,15 @@ impl<'a> EventRef<'a> {
_ => panic!("Failed type coercion, {:?} is not a metric reference", self),
}
}

/// Access the metadata in this reference.
pub fn metadata(&self) -> &EventMetadata {
match self {
Self::Log(event) => event.metadata(),
Self::Metric(event) => event.metadata(),
Self::Trace(event) => event.metadata(),
}
}
}

impl<'a> From<&'a Event> for EventRef<'a> {
Expand Down Expand Up @@ -105,6 +116,16 @@ impl<'a> EventDataEq<Event> for EventRef<'a> {
}
}

impl<'a> EstimatedJsonEncodedSizeOf for EventRef<'a> {
fn estimated_json_encoded_size_of(&self) -> usize {
match self {
EventRef::Log(v) => v.estimated_json_encoded_size_of(),
EventRef::Metric(v) => v.estimated_json_encoded_size_of(),
EventRef::Trace(v) => v.estimated_json_encoded_size_of(),
}
}
}

/// A wrapper for mutable references to inner event types, where reconstituting
/// a full `Event` from a `LogEvent` or `Metric` might be inconvenient.
#[derive(Debug)]
Expand All @@ -118,6 +139,15 @@ pub enum EventMutRef<'a> {
}

impl<'a> EventMutRef<'a> {
/// Convert an `EventMutRef` to an `EventRef`.
pub fn as_event_ref(&'a self) -> EventRef<'a> {
match self {
EventMutRef::Log(v) => EventRef::Log(v),
EventMutRef::Metric(v) => EventRef::Metric(v),
EventMutRef::Trace(v) => EventRef::Trace(v),
}
}

/// Extract the `LogEvent` reference in this.
///
/// # Panics
Expand Down
1 change: 1 addition & 0 deletions lib/vector-core/src/stream/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ where
count: cbs.0,
byte_size: cbs.1,
output: None,
source: None,
});

// This condition occurs specifically when the `HttpBatchService::call()` is called *within* the `Service::call()`
Expand Down
77 changes: 56 additions & 21 deletions lib/vector-core/src/transform/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::{collections::HashMap, error, pin::Pin};

use futures::{Stream, StreamExt};
use vector_common::{
config::SourceDetails,
internal_event::{emit, EventsSent, DEFAULT_OUTPUT},
EventDataEq,
};
Expand Down Expand Up @@ -220,10 +221,14 @@ pub struct TransformOutputs {
outputs_spec: Vec<Output>,
primary_output: Option<Fanout>,
named_outputs: HashMap<String, Fanout>,
sources_details: Vec<SourceDetails>,
}

impl TransformOutputs {
pub fn new(outputs_in: Vec<Output>) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) {
pub fn new(
outputs_in: Vec<Output>,
sources_details: Vec<SourceDetails>,
) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) {
let outputs_spec = outputs_in.clone();
let mut primary_output = None;
let mut named_outputs = HashMap::new();
Expand All @@ -247,6 +252,7 @@ impl TransformOutputs {
outputs_spec,
primary_output,
named_outputs,
sources_details,
};

(me, controls)
Expand All @@ -267,33 +273,62 @@ impl TransformOutputs {
buf: &mut TransformOutputsBuf,
) -> Result<(), Box<dyn error::Error + Send + Sync>> {
if let Some(primary) = self.primary_output.as_mut() {
let count = buf.primary_buffer.as_ref().map_or(0, OutputBuffer::len);
let byte_size = buf.primary_buffer.as_ref().map_or(
0,
EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of,
);
buf.primary_buffer
.as_mut()
.expect("mismatched outputs")
.send(primary)
.await?;
emit(EventsSent {
count,
byte_size,
output: Some(DEFAULT_OUTPUT),
let primary_buffer = buf.primary_buffer.as_mut().expect("mismatched outputs");

primary_buffer.send(primary).await?;

let mut sources: HashMap<Option<usize>, (usize, usize)> = HashMap::new();
primary_buffer.iter_events().for_each(|event| {
let size = event.estimated_json_encoded_size_of();

sources
.entry(event.metadata().source_id())
.and_modify(|(count, byte_size)| {
*count += 1;
*byte_size += size;
})
.or_insert((1, size));
});

for (source_id, (count, byte_size)) in sources {
emit(EventsSent {
count,
byte_size,
output: Some(DEFAULT_OUTPUT),
source: source_id.and_then(|id| {
self.sources_details.get(id).map(|details| details.key.id())
}),
});
}
}

for (key, buf) in &mut buf.named_buffers {
let count = buf.len();
let byte_size = buf.estimated_json_encoded_size_of();
buf.send(self.named_outputs.get_mut(key).expect("unknown output"))
.await?;
emit(EventsSent {
count,
byte_size,
output: Some(key.as_ref()),

let mut sources: HashMap<Option<usize>, (usize, usize)> = HashMap::new();
buf.iter_events().for_each(|event| {
let size = event.estimated_json_encoded_size_of();

sources
.entry(event.metadata().source_id())
.and_modify(|(count, byte_size)| {
*count += 1;
*byte_size += size;
})
.or_insert((1, size));
});

for (source_id, (count, byte_size)) in sources {
emit(EventsSent {
count,
byte_size,
output: Some(key.as_ref()),
source: source_id.and_then(|id| {
self.sources_details.get(id).map(|details| details.key.id())
}),
});
}
}

Ok(())
Expand Down
11 changes: 10 additions & 1 deletion src/config/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ use async_trait::async_trait;
use enum_dispatch::enum_dispatch;
use serde::Serialize;
use vector_buffers::{BufferConfig, BufferType};
use vector_common::config::SourceDetails;
use vector_config::{configurable_component, Configurable, NamedComponent};
use vector_core::{
config::{AcknowledgementsConfig, GlobalOptions, Input},
sink::VectorSink,
};

use super::{id::Inputs, schema, ComponentKey, ProxyConfig, Resource};
use crate::sinks::{util::UriSerde, Healthcheck, Sinks};
use crate::{
sinks::{util::UriSerde, Healthcheck, Sinks},
};

/// Fully resolved sink component.
#[configurable_component]
Expand Down Expand Up @@ -211,6 +214,7 @@ pub struct SinkContext {
pub globals: GlobalOptions,
pub proxy: ProxyConfig,
pub schema: schema::Options,
pub sources_details: Vec<SourceDetails>,
}

impl SinkContext {
Expand All @@ -221,6 +225,7 @@ impl SinkContext {
globals: GlobalOptions::default(),
proxy: ProxyConfig::default(),
schema: schema::Options::default(),
sources_details: vec![],
}
}

Expand All @@ -231,4 +236,8 @@ impl SinkContext {
pub const fn proxy(&self) -> &ProxyConfig {
&self.proxy
}

pub fn source_details(&self, id: usize) -> Option<&SourceDetails> {
self.sources_details.get(id)
}
}
5 changes: 3 additions & 2 deletions src/sinks/blackhole/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ pub struct BlackholeConfig {

#[async_trait::async_trait]
impl SinkConfig for BlackholeConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let sink = BlackholeSink::new(self.clone());
async fn build(&self, ctx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let sources_details = ctx.sources_details.clone();
let sink = BlackholeSink::new(self.clone(), sources_details);
let healthcheck = future::ok(()).boxed();

Ok((VectorSink::Stream(Box::new(sink)), healthcheck))
Expand Down
Loading