diff --git a/lib/vector-common/src/config.rs b/lib/vector-common/src/config.rs index 1c3f568d00880..8143332b260d3 100644 --- a/lib/vector-common/src/config.rs +++ b/lib/vector-common/src/config.rs @@ -80,6 +80,11 @@ impl PartialOrd for ComponentKey { impl ConfigurableString for ComponentKey {} +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct SourceDetails { + pub key: ComponentKey, +} + #[cfg(test)] mod tests { use super::*; diff --git a/lib/vector-common/src/internal_event/events_sent.rs b/lib/vector-common/src/internal_event/events_sent.rs index ce1517af1a412..8b823b7028290 100644 --- a/lib/vector-common/src/internal_event/events_sent.rs +++ b/lib/vector-common/src/internal_event/events_sent.rs @@ -10,25 +10,28 @@ pub struct EventsSent<'a> { pub count: usize, pub byte_size: usize, pub output: Option<&'a str>, + pub source: Option<&'a str>, } impl<'a> InternalEvent for EventsSent<'a> { fn emit(self) { + let source = self.source.unwrap_or("UNKNOWN"); + if let Some(output) = self.output { - trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size, output = %output); + trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size, source = %source, output = %output); } else { - trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size); + trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size, source = %source); } if self.count > 0 { if let Some(output) = self.output { - counter!("component_sent_events_total", self.count as u64, "output" => output.to_owned()); - counter!("events_out_total", self.count as u64, "output" => output.to_owned()); - counter!("component_sent_event_bytes_total", self.byte_size as u64, "output" => output.to_owned()); + counter!("component_sent_events_total", self.count as u64, "source" => source.to_owned(), "output" => output.to_owned()); + counter!("events_out_total", self.count as u64, "source" => source.to_owned(), "output" => output.to_owned()); + counter!("component_sent_event_bytes_total", self.byte_size as u64, "source" => source.to_owned(), "output" => output.to_owned()); } else { - counter!("component_sent_events_total", self.count as u64); - counter!("events_out_total", self.count as u64); - counter!("component_sent_event_bytes_total", self.byte_size as u64); + counter!("component_sent_events_total", self.count as u64, "source" => source.to_owned()); + counter!("events_out_total", self.count as u64, "source" => source.to_owned()); + counter!("component_sent_event_bytes_total", self.byte_size as u64, "source" => source.to_owned()); } } } diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index 9b292c8c64469..d658f3c03e34d 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -35,6 +35,14 @@ pub struct EventMetadata { /// TODO(Jean): must not skip serialization to track schemas across restarts. #[serde(default = "default_schema_definition", skip)] schema_definition: Arc, + + /// A unique identifier of the originating source of this event. + /// + /// Can be used internally to refer back to source details such as its component key. + /// + /// If `None`, then the event has no originating source (e.g. it was created internally, such + /// as in the Lua or Remap transforms). + source_id: Option, } fn default_metadata_value() -> Value { @@ -98,6 +106,7 @@ impl Default for EventMetadata { secrets: Secrets::new(), finalizers: Default::default(), schema_definition: default_schema_definition(), + source_id: None, } } } @@ -202,6 +211,16 @@ impl EventMetadata { pub fn set_schema_definition(&mut self, definition: &Arc) { self.schema_definition = Arc::clone(definition); } + + /// set the source ID. + pub fn set_source_id(&mut self, source_id: usize) { + self.source_id = Some(source_id); + } + + /// Get the source ID. + pub fn source_id(&self) -> Option { + self.source_id + } } impl EventDataEq for EventMetadata { diff --git a/lib/vector-core/src/event/ref.rs b/lib/vector-core/src/event/ref.rs index cc8b3c7512d25..85324d5d82adf 100644 --- a/lib/vector-core/src/event/ref.rs +++ b/lib/vector-core/src/event/ref.rs @@ -2,6 +2,8 @@ use vector_common::EventDataEq; +use crate::EstimatedJsonEncodedSizeOf; + use super::{Event, EventMetadata, LogEvent, Metric, TraceEvent}; /// A wrapper for references to inner event types, where reconstituting @@ -64,6 +66,15 @@ impl<'a> EventRef<'a> { _ => panic!("Failed type coercion, {:?} is not a metric reference", self), } } + + /// Access the metadata in this reference. + pub fn metadata(&self) -> &EventMetadata { + match self { + Self::Log(event) => event.metadata(), + Self::Metric(event) => event.metadata(), + Self::Trace(event) => event.metadata(), + } + } } impl<'a> From<&'a Event> for EventRef<'a> { @@ -105,6 +116,16 @@ impl<'a> EventDataEq for EventRef<'a> { } } +impl<'a> EstimatedJsonEncodedSizeOf for EventRef<'a> { + fn estimated_json_encoded_size_of(&self) -> usize { + match self { + EventRef::Log(v) => v.estimated_json_encoded_size_of(), + EventRef::Metric(v) => v.estimated_json_encoded_size_of(), + EventRef::Trace(v) => v.estimated_json_encoded_size_of(), + } + } +} + /// A wrapper for mutable references to inner event types, where reconstituting /// a full `Event` from a `LogEvent` or `Metric` might be inconvenient. #[derive(Debug)] @@ -118,6 +139,15 @@ pub enum EventMutRef<'a> { } impl<'a> EventMutRef<'a> { + /// Convert an `EventMutRef` to an `EventRef`. + pub fn as_event_ref(&'a self) -> EventRef<'a> { + match self { + EventMutRef::Log(v) => EventRef::Log(v), + EventMutRef::Metric(v) => EventRef::Metric(v), + EventMutRef::Trace(v) => EventRef::Trace(v), + } + } + /// Extract the `LogEvent` reference in this. /// /// # Panics diff --git a/lib/vector-core/src/stream/driver.rs b/lib/vector-core/src/stream/driver.rs index f1b1988582e42..5ca4d46636ddb 100644 --- a/lib/vector-core/src/stream/driver.rs +++ b/lib/vector-core/src/stream/driver.rs @@ -192,6 +192,7 @@ where count: cbs.0, byte_size: cbs.1, output: None, + source: None, }); // This condition occurs specifically when the `HttpBatchService::call()` is called *within* the `Service::call()` diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index 142087cd94493..ac66adfbb37b4 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, error, pin::Pin}; use futures::{Stream, StreamExt}; use vector_common::{ + config::SourceDetails, internal_event::{emit, EventsSent, DEFAULT_OUTPUT}, EventDataEq, }; @@ -220,10 +221,14 @@ pub struct TransformOutputs { outputs_spec: Vec, primary_output: Option, named_outputs: HashMap, + sources_details: Vec, } impl TransformOutputs { - pub fn new(outputs_in: Vec) -> (Self, HashMap, fanout::ControlChannel>) { + pub fn new( + outputs_in: Vec, + sources_details: Vec, + ) -> (Self, HashMap, fanout::ControlChannel>) { let outputs_spec = outputs_in.clone(); let mut primary_output = None; let mut named_outputs = HashMap::new(); @@ -247,6 +252,7 @@ impl TransformOutputs { outputs_spec, primary_output, named_outputs, + sources_details, }; (me, controls) @@ -267,33 +273,62 @@ impl TransformOutputs { buf: &mut TransformOutputsBuf, ) -> Result<(), Box> { if let Some(primary) = self.primary_output.as_mut() { - let count = buf.primary_buffer.as_ref().map_or(0, OutputBuffer::len); - let byte_size = buf.primary_buffer.as_ref().map_or( - 0, - EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of, - ); - buf.primary_buffer - .as_mut() - .expect("mismatched outputs") - .send(primary) - .await?; - emit(EventsSent { - count, - byte_size, - output: Some(DEFAULT_OUTPUT), + let primary_buffer = buf.primary_buffer.as_mut().expect("mismatched outputs"); + + primary_buffer.send(primary).await?; + + let mut sources: HashMap, (usize, usize)> = HashMap::new(); + primary_buffer.iter_events().for_each(|event| { + let size = event.estimated_json_encoded_size_of(); + + sources + .entry(event.metadata().source_id()) + .and_modify(|(count, byte_size)| { + *count += 1; + *byte_size += size; + }) + .or_insert((1, size)); }); + + for (source_id, (count, byte_size)) in sources { + emit(EventsSent { + count, + byte_size, + output: Some(DEFAULT_OUTPUT), + source: source_id.and_then(|id| { + self.sources_details.get(id).map(|details| details.key.id()) + }), + }); + } } for (key, buf) in &mut buf.named_buffers { - let count = buf.len(); - let byte_size = buf.estimated_json_encoded_size_of(); buf.send(self.named_outputs.get_mut(key).expect("unknown output")) .await?; - emit(EventsSent { - count, - byte_size, - output: Some(key.as_ref()), + + let mut sources: HashMap, (usize, usize)> = HashMap::new(); + buf.iter_events().for_each(|event| { + let size = event.estimated_json_encoded_size_of(); + + sources + .entry(event.metadata().source_id()) + .and_modify(|(count, byte_size)| { + *count += 1; + *byte_size += size; + }) + .or_insert((1, size)); }); + + for (source_id, (count, byte_size)) in sources { + emit(EventsSent { + count, + byte_size, + output: Some(key.as_ref()), + source: source_id.and_then(|id| { + self.sources_details.get(id).map(|details| details.key.id()) + }), + }); + } } Ok(()) diff --git a/src/config/sink.rs b/src/config/sink.rs index c777525ebcb68..e10e6739ce22f 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use enum_dispatch::enum_dispatch; use serde::Serialize; use vector_buffers::{BufferConfig, BufferType}; +use vector_common::config::SourceDetails; use vector_config::{configurable_component, Configurable, NamedComponent}; use vector_core::{ config::{AcknowledgementsConfig, GlobalOptions, Input}, @@ -9,7 +10,9 @@ use vector_core::{ }; use super::{id::Inputs, schema, ComponentKey, ProxyConfig, Resource}; -use crate::sinks::{util::UriSerde, Healthcheck, Sinks}; +use crate::{ + sinks::{util::UriSerde, Healthcheck, Sinks}, +}; /// Fully resolved sink component. #[configurable_component] @@ -211,6 +214,7 @@ pub struct SinkContext { pub globals: GlobalOptions, pub proxy: ProxyConfig, pub schema: schema::Options, + pub sources_details: Vec, } impl SinkContext { @@ -221,6 +225,7 @@ impl SinkContext { globals: GlobalOptions::default(), proxy: ProxyConfig::default(), schema: schema::Options::default(), + sources_details: vec![], } } @@ -231,4 +236,8 @@ impl SinkContext { pub const fn proxy(&self) -> &ProxyConfig { &self.proxy } + + pub fn source_details(&self, id: usize) -> Option<&SourceDetails> { + self.sources_details.get(id) + } } diff --git a/src/sinks/blackhole/config.rs b/src/sinks/blackhole/config.rs index 5777afab4d7a4..b10c88483bf8b 100644 --- a/src/sinks/blackhole/config.rs +++ b/src/sinks/blackhole/config.rs @@ -39,8 +39,9 @@ pub struct BlackholeConfig { #[async_trait::async_trait] impl SinkConfig for BlackholeConfig { - async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let sink = BlackholeSink::new(self.clone()); + async fn build(&self, ctx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let sources_details = ctx.sources_details.clone(); + let sink = BlackholeSink::new(self.clone(), sources_details); let healthcheck = future::ok(()).boxed(); Ok((VectorSink::Stream(Box::new(sink)), healthcheck)) diff --git a/src/sinks/blackhole/sink.rs b/src/sinks/blackhole/sink.rs index ba6d8b8b62d67..85af4b678f4c0 100644 --- a/src/sinks/blackhole/sink.rs +++ b/src/sinks/blackhole/sink.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -13,6 +14,7 @@ use tokio::{ sync::watch, time::{interval, sleep_until}, }; +use vector_common::config::SourceDetails; use vector_core::{ internal_event::{BytesSent, EventsSent}, EstimatedJsonEncodedSizeOf, @@ -28,15 +30,17 @@ pub struct BlackholeSink { total_raw_bytes: Arc, config: BlackholeConfig, last: Option, + sources_details: Vec, } impl BlackholeSink { - pub fn new(config: BlackholeConfig) -> Self { + pub fn new(config: BlackholeConfig, sources_details: Vec) -> Self { BlackholeSink { config, total_events: Arc::new(AtomicUsize::new(0)), total_raw_bytes: Arc::new(AtomicUsize::new(0)), last: None, + sources_details, } } } @@ -85,23 +89,37 @@ impl StreamSink for BlackholeSink { self.last = Some(until); } - let message_len = events.estimated_json_encoded_size_of(); + let mut sources: HashMap, (usize, usize)> = HashMap::new(); + events.iter_events().for_each(|event| { + let size = event.estimated_json_encoded_size_of(); - let _ = self.total_events.fetch_add(events.len(), Ordering::AcqRel); - let _ = self - .total_raw_bytes - .fetch_add(message_len, Ordering::AcqRel); - - emit!(EventsSent { - count: events.len(), - byte_size: message_len, - output: None, + sources + .entry(event.metadata().source_id()) + .and_modify(|(count, byte_size)| { + *count += 1; + *byte_size += size; + }) + .or_insert((1, size)); }); - emit!(BytesSent { - byte_size: message_len, - protocol: "blackhole".to_string().into(), - }); + for (source_id, (count, byte_size)) in sources { + let _ = self.total_events.fetch_add(events.len(), Ordering::AcqRel); + let _ = self.total_raw_bytes.fetch_add(byte_size, Ordering::AcqRel); + + emit!(EventsSent { + count, + byte_size, + output: None, + source: source_id.and_then(|id| { + self.sources_details.get(id).map(|details| details.key.id()) + }), + }); + + emit!(BytesSent { + byte_size, + protocol: "blackhole".to_string().into(), + }); + } } // Notify the reporting task to shutdown. diff --git a/src/sinks/console/config.rs b/src/sinks/console/config.rs index 7e4ca5e44fb59..bb81659b5ef62 100644 --- a/src/sinks/console/config.rs +++ b/src/sinks/console/config.rs @@ -60,7 +60,8 @@ impl GenerateConfig for ConsoleSinkConfig { #[async_trait::async_trait] impl SinkConfig for ConsoleSinkConfig { - async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + async fn build(&self, ctx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let sources_details = ctx.sources_details.clone(); let transformer = self.encoding.transformer(); let (framer, serializer) = self.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); @@ -70,11 +71,13 @@ impl SinkConfig for ConsoleSinkConfig { output: io::stdout(), transformer, encoder, + sources_details, }), Target::Stderr => VectorSink::from_event_streamsink(WriterSink { output: io::stderr(), transformer, encoder, + sources_details, }), }; diff --git a/src/sinks/console/sink.rs b/src/sinks/console/sink.rs index d303131fcf2fe..5c64cbeb01f0c 100644 --- a/src/sinks/console/sink.rs +++ b/src/sinks/console/sink.rs @@ -4,6 +4,7 @@ use codecs::encoding::Framer; use futures::{stream::BoxStream, StreamExt}; use tokio::{io, io::AsyncWriteExt}; use tokio_util::codec::Encoder as _; +use vector_common::config::SourceDetails; use vector_core::{ internal_event::{ByteSize, BytesSent, EventsSent, InternalEventHandle as _, Protocol}, EstimatedJsonEncodedSizeOf, @@ -19,6 +20,7 @@ pub struct WriterSink { pub output: T, pub transformer: Transformer, pub encoder: Encoder, + pub sources_details: Vec, } #[async_trait] @@ -30,6 +32,11 @@ where let bytes_sent = register!(BytesSent::from(Protocol("console".into(),))); while let Some(mut event) = input.next().await { let event_byte_size = event.estimated_json_encoded_size_of(); + let source = event + .metadata() + .source_id() + .and_then(|id| self.sources_details.get(id).map(|details| details.key.id())); + self.transformer.transform(&mut event); let finalizers = event.take_finalizers(); @@ -54,6 +61,7 @@ where byte_size: event_byte_size, count: 1, output: None, + source, }); bytes_sent.emit(ByteSize(bytes.len())); } diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index f0e21ada44f75..4cd7a68cb305d 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -17,6 +17,7 @@ use tokio::{ io::AsyncWriteExt, }; use tokio_util::codec::Encoder as _; +use vector_common::config::SourceDetails; use vector_config::configurable_component; use vector_core::{internal_event::EventsSent, EstimatedJsonEncodedSizeOf}; @@ -152,9 +153,9 @@ impl OutFile { impl SinkConfig for FileSinkConfig { async fn build( &self, - _cx: SinkContext, + ctx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let sink = FileSink::new(self)?; + let sink = FileSink::new(self, ctx.sources_details.clone())?; Ok(( super::VectorSink::from_event_streamsink(sink), future::ok(()).boxed(), @@ -178,10 +179,14 @@ pub struct FileSink { idle_timeout: Duration, files: ExpiringHashMap, compression: Compression, + sources_details: Vec, } impl FileSink { - pub fn new(config: &FileSinkConfig) -> crate::Result { + pub fn new( + config: &FileSinkConfig, + sources_details: Vec, + ) -> crate::Result { let transformer = config.encoding.transformer(); let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::::new(framer, serializer); @@ -193,6 +198,7 @@ impl FileSink { idle_timeout: Duration::from_secs(config.idle_timeout_secs.unwrap_or(30)), files: ExpiringHashMap::default(), compression: config.compression, + sources_details, }) } @@ -335,6 +341,11 @@ impl FileSink { trace!(message = "Writing an event to file.", path = ?path); let event_size = event.estimated_json_encoded_size_of(); let finalizers = event.take_finalizers(); + let source = event + .metadata() + .source_id() + .and_then(|id| self.sources_details.get(id).map(|details| details.key.id())); + match write_event_to_file(file, event, &self.transformer, &mut self.encoder).await { Ok(byte_size) => { finalizers.update_status(EventStatus::Delivered); @@ -342,6 +353,7 @@ impl FileSink { count: 1, byte_size: event_size, output: None, + source, }); emit!(FileBytesSent { byte_size, @@ -439,7 +451,7 @@ mod tests { acknowledgements: Default::default(), }; - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, vec![]).unwrap(); let (input, _events) = random_lines_with_stream(100, 64, None); let events = Box::pin(stream::iter( @@ -475,7 +487,7 @@ mod tests { acknowledgements: Default::default(), }; - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, vec![]).unwrap(); let (input, _) = random_lines_with_stream(100, 64, None); let events = Box::pin(stream::iter( @@ -511,7 +523,7 @@ mod tests { acknowledgements: Default::default(), }; - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, vec![]).unwrap(); let (input, _) = random_lines_with_stream(100, 64, None); let events = Box::pin(stream::iter( @@ -552,7 +564,7 @@ mod tests { acknowledgements: Default::default(), }; - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, vec![]).unwrap(); let (mut input, _events) = random_events_with_stream(32, 8, None); input[0].as_mut_log().insert("date", "2019-26-07"); @@ -637,7 +649,7 @@ mod tests { acknowledgements: Default::default(), }; - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, vec![]).unwrap(); let (mut input, _events) = random_lines_with_stream(10, 64, None); let (mut tx, rx) = futures::channel::mpsc::channel(0); diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index 307e9c80040e5..d16ac9a89b33b 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -6,8 +6,9 @@ use codecs::JsonSerializerConfig; use futures::{stream::BoxStream, FutureExt, StreamExt, TryFutureExt}; use snafu::{ResultExt, Snafu}; use tokio_util::codec::Encoder as _; -use vector_common::internal_event::{ - ByteSize, BytesSent, EventsSent, InternalEventHandle, Protocol, +use vector_common::{ + config::SourceDetails, + internal_event::{ByteSize, BytesSent, EventsSent, InternalEventHandle, Protocol}, }; use vector_config::configurable_component; @@ -99,9 +100,9 @@ impl GenerateConfig for NatsSinkConfig { impl SinkConfig for NatsSinkConfig { async fn build( &self, - _cx: SinkContext, + ctx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let sink = NatsSink::new(self.clone()).await?; + let sink = NatsSink::new(self.clone(), ctx).await?; let healthcheck = healthcheck(self.clone()).boxed(); Ok((super::VectorSink::from_event_streamsink(sink), healthcheck)) } @@ -140,20 +141,23 @@ pub struct NatsSink { encoder: Encoder<()>, connection: nats::asynk::Connection, subject: Template, + sources_details: Vec, } impl NatsSink { - async fn new(config: NatsSinkConfig) -> Result { + async fn new(config: NatsSinkConfig, ctx: SinkContext) -> Result { let connection = config.connect().await?; let transformer = config.encoding.transformer(); let serializer = config.encoding.build().context(EncodingSnafu)?; let encoder = Encoder::<()>::new(serializer); + let sources_details = ctx.sources_details.clone(); Ok(NatsSink { connection, transformer, encoder, subject: Template::try_from(config.subject).context(SubjectTemplateSnafu)?, + sources_details, }) } } @@ -182,6 +186,10 @@ impl StreamSink for NatsSink { self.transformer.transform(&mut event); let event_byte_size = event.estimated_json_encoded_size_of(); + let source = event + .metadata() + .source_id() + .and_then(|id| self.sources_details.get(id).map(|details| details.key.id())); let mut bytes = BytesMut::new(); if self.encoder.encode(event, &mut bytes).is_err() { @@ -202,7 +210,8 @@ impl StreamSink for NatsSink { emit!(EventsSent { byte_size: event_byte_size, count: 1, - output: None + output: None, + source, }); bytes_sent.emit(ByteSize(bytes.len())); } diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 078a948e3b70e..63661854ef739 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, convert::Infallible, hash::Hash, mem::{discriminant, Discriminant}, @@ -19,10 +20,11 @@ use serde_with::serde_as; use snafu::Snafu; use stream_cancel::{Trigger, Tripwire}; use tracing::{Instrument, Span}; +use vector_common::config::SourceDetails; use vector_config::configurable_component; use vector_core::{ internal_event::{ - ByteSize, BytesSent, EventsSent, InternalEventHandle as _, Protocol, Registered, + emit, ByteSize, BytesSent, EventsSent, InternalEventHandle as _, Protocol, Registered, }, ByteSizeOf, EstimatedJsonEncodedSizeOf, }; @@ -180,7 +182,7 @@ impl GenerateConfig for PrometheusExporterConfig { #[async_trait::async_trait] impl SinkConfig for PrometheusExporterConfig { - async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + async fn build(&self, ctx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { if self.flush_period_secs.as_secs() < MIN_FLUSH_PERIOD_SECS { return Err(Box::new(BuildError::FlushPeriodTooShort { min: MIN_FLUSH_PERIOD_SECS, @@ -189,7 +191,8 @@ impl SinkConfig for PrometheusExporterConfig { validate_quantiles(&self.quantiles)?; - let sink = PrometheusExporter::new(self.clone()); + let sources_details = ctx.sources_details; + let sink = PrometheusExporter::new(self.clone(), sources_details); let healthcheck = future::ok(()).boxed(); Ok((VectorSink::from_event_streamsink(sink), healthcheck)) @@ -212,6 +215,7 @@ struct PrometheusExporter { server_shutdown_trigger: Option, config: PrometheusExporterConfig, metrics: Arc>>, + sources_details: Vec, } /// Expiration metadata for a metric. @@ -378,6 +382,7 @@ fn handle( quantiles: &[f64], metrics: &RwLock>, bytes_sent: &Registered, + sources_details: &[SourceDetails], ) -> Response { let mut response = Response::new(Body::empty()); @@ -393,15 +398,20 @@ fn handle( (true, &Method::GET, "/metrics") => { let metrics = metrics.read().expect(LOCK_FAILED); - let count = metrics.len(); - let byte_size = metrics - .iter() - .map(|(_, (metric, _))| metric.estimated_json_encoded_size_of()) - .sum(); - let mut collector = StringCollector::new(); + let mut sources: HashMap, (usize, usize)> = HashMap::new(); for (_, (metric, _)) in metrics.iter() { + let size = metric.estimated_json_encoded_size_of(); + + sources + .entry(metric.metadata().source_id()) + .and_modify(|(count, byte_size)| { + *count += 1; + *byte_size += size; + }) + .or_insert((1, size)); + collector.encode_metric(default_namespace, buckets, quantiles, metric); } @@ -419,11 +429,15 @@ fn handle( bytes_sent.emit(ByteSize(body_size)); - emit!(EventsSent { - count, - byte_size, - output: None - }); + for (source_id, (count, byte_size)) in sources { + emit(EventsSent { + count, + byte_size, + output: None, + source: source_id + .and_then(|id| sources_details.get(id).map(|details| details.key.id())), + }); + } } (true, _, _) => { @@ -435,11 +449,12 @@ fn handle( } impl PrometheusExporter { - fn new(config: PrometheusExporterConfig) -> Self { + fn new(config: PrometheusExporterConfig, sources_details: Vec) -> Self { Self { server_shutdown_trigger: None, config, metrics: Arc::new(RwLock::new(IndexMap::new())), + sources_details, } } @@ -456,6 +471,7 @@ impl PrometheusExporter { let buckets = self.config.buckets.clone(); let quantiles = self.config.quantiles.clone(); let auth = self.config.auth.clone(); + let sources_details = self.sources_details.clone(); let new_service = make_service_fn(move |_| { let span = Span::current(); @@ -465,6 +481,7 @@ impl PrometheusExporter { let quantiles = quantiles.clone(); let bytes_sent = bytes_sent.clone(); let auth = auth.clone(); + let sources_details = sources_details.clone(); async move { Ok::<_, Infallible>(service_fn(move |req| { @@ -477,6 +494,7 @@ impl PrometheusExporter { &quantiles, &metrics, &bytes_sent, + &sources_details, ); emit!(PrometheusServerRequestComplete { diff --git a/src/sinks/pulsar.rs b/src/sinks/pulsar.rs index 33357acf4e1f8..c9d0858bb14b7 100644 --- a/src/sinks/pulsar.rs +++ b/src/sinks/pulsar.rs @@ -24,6 +24,7 @@ use snafu::{ResultExt, Snafu}; use tokio_util::codec::Encoder as _; use value::Value; use vector_common::{ + config::SourceDetails, internal_event::{ ByteSize, BytesSent, EventsSent, InternalEventHandle as _, Protocol, Registered, }, @@ -121,6 +122,7 @@ enum PulsarSinkState { Result, RequestMetadata, EventFinalizers, + Option, ), >, ), @@ -138,10 +140,12 @@ struct PulsarSink { Result, RequestMetadata, EventFinalizers, + Option, ), >, >, bytes_sent: Registered, + sources_details: Vec, } impl GenerateConfig for PulsarSinkConfig { @@ -162,7 +166,7 @@ impl GenerateConfig for PulsarSinkConfig { impl SinkConfig for PulsarSinkConfig { async fn build( &self, - _cx: SinkContext, + ctx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { let producer = self .create_pulsar_producer() @@ -172,12 +176,14 @@ impl SinkConfig for PulsarSinkConfig { let transformer = self.encoding.transformer(); let serializer = self.encoding.build()?; let encoder = Encoder::<()>::new(serializer); + let sources_details = ctx.sources_details.clone(); let sink = PulsarSink::new( producer, transformer, encoder, self.partition_key_field.clone(), + sources_details, )?; let producer = self @@ -257,6 +263,7 @@ impl PulsarSink { transformer: Transformer, encoder: Encoder<()>, partition_key_field: Option, + sources_details: Vec, ) -> crate::Result { Ok(Self { transformer, @@ -265,12 +272,19 @@ impl PulsarSink { in_flight: FuturesUnordered::new(), bytes_sent: register!(BytesSent::from(Protocol::TCP)), partition_key_field, + sources_details, }) } fn poll_in_flight_prepare(&mut self, cx: &mut Context<'_>) -> Poll<()> { if let PulsarSinkState::Sending(fut) = &mut self.state { - let (producer, result, metadata, finalizers) = ready!(fut.as_mut().poll(cx)); + let (producer, result, metadata, finalizers, source_id) = ready!(fut.as_mut().poll(cx)); + + let source = source_id.and_then(|id| { + self.sources_details + .get(id) + .map(|details| details.key.id().to_owned()) + }); self.state = PulsarSinkState::Ready(producer); self.in_flight.push(Box::pin(async move { @@ -278,7 +292,8 @@ impl PulsarSink { Ok(fut) => fut.await, Err(error) => Err(error), }; - (result, metadata, finalizers) + + (result, metadata, finalizers, source) })); } @@ -318,6 +333,8 @@ impl Sink for PulsarSink { let metadata_builder = RequestMetadataBuilder::from_events(&event); self.transformer.transform(&mut event); + let source_id = event.metadata().source_id(); + let finalizers = event.take_finalizers(); let mut bytes = BytesMut::new(); self.encoder.encode(event, &mut bytes).map_err(|_| { @@ -346,7 +363,7 @@ impl Sink for PulsarSink { builder = builder.with_key(key); }; let result = builder.send().await; - (producer, result, metadata, finalizers) + (producer, result, metadata, finalizers, source_id) })), ); @@ -359,7 +376,7 @@ impl Sink for PulsarSink { let this = Pin::into_inner(self); while !this.in_flight.is_empty() { match ready!(Pin::new(&mut this.in_flight).poll_next(cx)) { - Some((Ok(result), metadata, finalizers)) => { + Some((Ok(result), metadata, finalizers, source)) => { trace!( message = "Pulsar sink produced message.", message_id = ?result.message_id, @@ -373,12 +390,13 @@ impl Sink for PulsarSink { count: metadata.event_count(), byte_size: metadata.events_estimated_json_encoded_byte_size(), output: None, + source: source.as_deref(), }); this.bytes_sent .emit(ByteSize(metadata.request_encoded_size())); } - Some((Err(error), metadata, finalizers)) => { + Some((Err(error), metadata, finalizers, _)) => { finalizers.update_status(EventStatus::Errored); emit!(PulsarSendingError { error: Box::new(error), diff --git a/src/sinks/util/sink.rs b/src/sinks/util/sink.rs index 17be9f59f1d7b..18f586de6b181 100644 --- a/src/sinks/util/sink.rs +++ b/src/sinks/util/sink.rs @@ -451,7 +451,8 @@ where emit!(EventsSent { count, byte_size, - output: None + output: None, + source: None, }); // TODO: Emit a BytesSent event here too } diff --git a/src/sinks/websocket/config.rs b/src/sinks/websocket/config.rs index c54aa1d7880e6..21f4e0dd8d189 100644 --- a/src/sinks/websocket/config.rs +++ b/src/sinks/websocket/config.rs @@ -65,9 +65,9 @@ impl GenerateConfig for WebSocketSinkConfig { #[async_trait::async_trait] impl SinkConfig for WebSocketSinkConfig { - async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + async fn build(&self, ctx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let connector = self.build_connector()?; - let ws_sink = WebSocketSink::new(self, connector.clone())?; + let ws_sink = WebSocketSink::new(self, connector.clone(), ctx)?; Ok(( VectorSink::from_event_streamsink(ws_sink), diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index 9361ac457b462..dc2010b2aec6b 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -23,6 +23,7 @@ use tokio_tungstenite::{ WebSocketStream as WsStream, }; use tokio_util::codec::Encoder as _; +use vector_common::config::SourceDetails; use vector_core::{ internal_event::{ByteSize, BytesSent, EventsSent, InternalEventHandle as _, Protocol}, EstimatedJsonEncodedSizeOf, @@ -30,6 +31,7 @@ use vector_core::{ use crate::{ codecs::{Encoder, Transformer}, + config::SinkContext, dns, emit, event::{Event, EventStatus, Finalizable}, http::Auth, @@ -193,10 +195,15 @@ pub struct WebSocketSink { connector: WebSocketConnector, ping_interval: Option, ping_timeout: Option, + sources_details: Vec, } impl WebSocketSink { - pub fn new(config: &WebSocketSinkConfig, connector: WebSocketConnector) -> crate::Result { + pub fn new( + config: &WebSocketSinkConfig, + connector: WebSocketConnector, + ctx: SinkContext, + ) -> crate::Result { let transformer = config.encoding.transformer(); let serializer = config.encoding.build()?; let encoder = Encoder::<()>::new(serializer); @@ -207,6 +214,7 @@ impl WebSocketSink { connector, ping_interval: config.ping_interval.filter(|v| *v > 0), ping_timeout: config.ping_timeout.filter(|v| *v > 0), + sources_details: ctx.sources_details.clone(), }) } @@ -289,6 +297,10 @@ impl WebSocketSink { self.transformer.transform(&mut event); let event_byte_size = event.estimated_json_encoded_size_of(); + let source = event.metadata().source_id().and_then(|id| self + .sources_details + .get(id) + .map(|details| details.key.id())); let mut bytes = BytesMut::new(); let res = match self.encoder.encode(event, &mut bytes) { @@ -302,7 +314,8 @@ impl WebSocketSink { emit!(EventsSent { count: 1, byte_size: event_byte_size, - output: None + output: None, + source, }); bytes_sent.emit(ByteSize(message_len)); }) diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index 39c8d5a9458ba..831ae3e25adeb 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -44,20 +44,29 @@ impl Builder { } } - pub fn add_output(&mut self, output: Output) -> LimitedReceiver { + pub fn add_output( + &mut self, + output: Output, + source_id: (usize, String), + ) -> LimitedReceiver { match output.port { None => { let (inner, rx) = Inner::new_with_buffer( self.buf_size, DEFAULT_OUTPUT.to_owned(), self.lag_time.clone(), + source_id, ); self.inner = Some(inner); rx } Some(name) => { - let (inner, rx) = - Inner::new_with_buffer(self.buf_size, name.clone(), self.lag_time.clone()); + let (inner, rx) = Inner::new_with_buffer( + self.buf_size, + name.clone(), + self.lag_time.clone(), + source_id, + ); self.named_inners.insert(name, inner); rx } @@ -90,9 +99,10 @@ impl SourceSender { } } + #[cfg(test)] pub fn new_with_buffer(n: usize) -> (Self, LimitedReceiver) { let lag_time = Some(register_histogram!(LAG_TIME_NAME)); - let (inner, rx) = Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time); + let (inner, rx) = Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, 0); ( Self { inner: Some(inner), @@ -160,7 +170,7 @@ impl SourceSender { ) -> impl Stream + Unpin { // The lag_time parameter here will need to be filled in if this function is ever used for // non-test situations. - let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None); + let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None, 0); let recv = recv.into_stream().map(move |mut events| { events.iter_events_mut().for_each(|mut event| { let metadata = event.metadata_mut(); @@ -223,6 +233,12 @@ struct Inner { inner: LimitedSender, output: String, lag_time: Option, + + /// The unique ID of the source to which this sender belongs. + /// + /// - The `usize` is the ID attached to individual event metadata. + /// - The `String` is used in the `EventsSent` internal event triggered by the sender. + source_id: (usize, String), } impl fmt::Debug for Inner { @@ -240,10 +256,12 @@ impl Inner { n: usize, output: String, lag_time: Option, + source_id: (usize, String), ) -> (Self, LimitedReceiver) { let (tx, rx) = channel::limited(n); ( Self { + source_id, inner: tx, output, lag_time, @@ -252,11 +270,12 @@ impl Inner { ) } - async fn send(&mut self, events: EventArray) -> Result<(), ClosedError> { + async fn send(&mut self, mut events: EventArray) -> Result<(), ClosedError> { let reference = Utc::now().timestamp_millis(); - events - .iter_events() - .for_each(|event| self.emit_lag_time(event, reference)); + events.iter_events_mut().for_each(|mut event| { + self.emit_lag_time(event.as_event_ref(), reference); + event.metadata_mut().set_source_id(self.source_id.0); + }); let byte_size = events.estimated_json_encoded_size_of(); let count = events.len(); self.inner.send(events).await.map_err(|_| ClosedError)?; @@ -264,6 +283,7 @@ impl Inner { count, byte_size, output: Some(self.output.as_ref()), + source: Some(&self.source_id.1), }); Ok(()) } @@ -310,6 +330,7 @@ impl Inner { count, byte_size, output: Some(self.output.as_ref()), + source: Some(&self.source_id.1), }); return Err(error.into()); } @@ -320,6 +341,7 @@ impl Inner { count, byte_size, output: Some(self.output.as_ref()), + source: Some(&self.source_id.1), }); Ok(()) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 6c2dc1d24af55..91903dcdbf5de 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -16,6 +16,7 @@ use tokio::{ time::{timeout, Duration}, }; use tracing::Instrument; +use vector_common::config::SourceDetails; use vector_config::NamedComponent; use vector_core::config::LogNamespace; use vector_core::{ @@ -26,7 +27,7 @@ use vector_core::{ }, BufferType, WhenFull, }, - internal_event::EventsSent, + internal_event::{emit, EventsSent}, schema::Definition, EstimatedJsonEncodedSizeOf, }; @@ -152,6 +153,8 @@ pub async fn build_pieces( let mut errors = vec![]; + let mut sources_details = vec![]; + let (enrichment_tables, enrichment_errors) = load_enrichment_tables(config, diff).await; errors.extend(enrichment_errors); @@ -179,6 +182,10 @@ pub async fn build_pieces( key.id() ); + sources_details.push(SourceDetails { key: key.clone() }); + + let pipeline_source_id = sources_details.len() - 1; + let mut builder = { let _span = span.enter(); SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE) @@ -188,7 +195,8 @@ pub async fn build_pieces( let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); for output in source_outputs { - let mut rx = builder.add_output(output.clone()); + let mut rx = + builder.add_output(output.clone(), (pipeline_source_id, key.id().to_owned())); let (mut fanout, control) = Fanout::new(); let pump = async move { @@ -387,7 +395,8 @@ pub async fn build_pieces( inputs.insert(key.clone(), (input_tx, node.inputs.clone())); - let (transform_task, transform_outputs) = build_transform(transform, node, input_rx); + let (transform_task, transform_outputs) = + build_transform(transform, node, input_rx, sources_details.clone()); outputs.extend(transform_outputs); tasks.insert(key.clone(), transform_task); @@ -449,6 +458,7 @@ pub async fn build_pieces( globals: config.global.clone(), proxy: ProxyConfig::merge_with_env(&config.global.proxy, sink.proxy()), schema: config.schema, + sources_details: sources_details.clone(), }; let (sink, healthcheck) = match sink.inner.build(cx).await { @@ -622,17 +632,21 @@ fn build_transform( transform: Transform, node: TransformNode, input_rx: BufferReceiver, + sources_details: Vec, ) -> (Task, HashMap) { match transform { // TODO: avoid the double boxing for function transforms here - Transform::Function(t) => build_sync_transform(Box::new(t), node, input_rx), - Transform::Synchronous(t) => build_sync_transform(t, node, input_rx), + Transform::Function(t) => { + build_sync_transform(Box::new(t), node, input_rx, sources_details) + } + Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, sources_details), Transform::Task(t) => build_task_transform( t, input_rx, node.input_details.data_type(), node.typetag, &node.key, + sources_details, ), } } @@ -641,8 +655,9 @@ fn build_sync_transform( t: Box, node: TransformNode, input_rx: BufferReceiver, + sources_details: Vec, ) -> (Task, HashMap) { - let (outputs, controls) = TransformOutputs::new(node.outputs); + let (outputs, controls) = TransformOutputs::new(node.outputs, sources_details); let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs); let transform = if node.enable_concurrency { @@ -822,6 +837,7 @@ fn build_task_transform( input_type: DataType, typetag: &str, key: &ComponentKey, + sources_details: Vec, ) -> (Task, HashMap) { let (mut fanout, control) = Fanout::new(); @@ -837,12 +853,29 @@ fn build_task_transform( }); let stream = t .transform(Box::pin(filtered)) - .inspect(|events: &EventArray| { - emit!(EventsSent { - count: events.len(), - byte_size: events.estimated_json_encoded_size_of(), - output: None, + .inspect(move |events: &EventArray| { + let mut sources: HashMap, (usize, usize)> = HashMap::new(); + events.iter_events().for_each(|event| { + let size = event.estimated_json_encoded_size_of(); + + sources + .entry(event.metadata().source_id()) + .and_modify(|(count, byte_size)| { + *count += 1; + *byte_size += size; + }) + .or_insert((1, size)); }); + + for (source_id, (count, byte_size)) in sources { + emit(EventsSent { + count, + byte_size, + output: None, + source: source_id + .and_then(|id| sources_details.get(id).map(|details| details.key.id())), + }); + } }); let transform = async move { debug!("Task transform starting.");