From 1a13ca9714cf83581063ea5f416b06ee66796b44 Mon Sep 17 00:00:00 2001 From: Jean Mertz Date: Thu, 3 Nov 2022 20:40:38 +0100 Subject: [PATCH 01/12] checkpoint Signed-off-by: Jean Mertz --- lib/vector-core/src/event/metadata.rs | 11 +++ rfcs/2022-11-03-XXX-data-volume-insights.md | 81 +++++++++++++++++++++ src/config/sink.rs | 11 ++- src/source_sender/mod.rs | 37 +++++++--- src/topology/builder.rs | 19 ++++- 5 files changed, 145 insertions(+), 14 deletions(-) create mode 100644 rfcs/2022-11-03-XXX-data-volume-insights.md diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index 9b292c8c64469..2a51a0654e7ab 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -35,6 +35,11 @@ pub struct EventMetadata { /// TODO(Jean): must not skip serialization to track schemas across restarts. #[serde(default = "default_schema_definition", skip)] schema_definition: Arc, + + /// A unique identifier of the originating source of this event. + /// + /// Can be used internally to refer back to source details such as its component key. + source_id: usize, } fn default_metadata_value() -> Value { @@ -98,6 +103,7 @@ impl Default for EventMetadata { secrets: Secrets::new(), finalizers: Default::default(), schema_definition: default_schema_definition(), + source_id: 0, // TODO(Jean): There's not a good default here. } } } @@ -202,6 +208,11 @@ impl EventMetadata { pub fn set_schema_definition(&mut self, definition: &Arc) { self.schema_definition = Arc::clone(definition); } + + /// set the source ID. + pub fn set_source_id(&mut self, source_id: usize) { + self.source_id = source_id; + } } impl EventDataEq for EventMetadata { diff --git a/rfcs/2022-11-03-XXX-data-volume-insights.md b/rfcs/2022-11-03-XXX-data-volume-insights.md new file mode 100644 index 0000000000000..0629f626a591d --- /dev/null +++ b/rfcs/2022-11-03-XXX-data-volume-insights.md @@ -0,0 +1,81 @@ +# RFC - - + +One paragraph description of the change. + +## Context + +- Link to any previous issues, RFCs, or briefs (do not repeat that context in this RFC). + +## Cross cutting concerns + +- Link to any ongoing or future work relevant to this change. + +## Scope + +### In scope + +- List work being directly addressed with this RFC. + +### Out of scope + +- List work that is completely out of scope. Use this to keep discussions focused. Please note the "future changes" section at the bottom. + +## Pain + +- What internal or external *pain* are we solving? +- Do not cover benefits of your change, this is covered in the "Rationale" section. + +## Proposal + +### User Experience + +- Explain your change as if you were describing it to a Vector user. We should be able to share this section with a Vector user to solicit feedback. +- Does this change break backward compatibility? If so, what should users do to upgrade? + +### Implementation + +- Explain your change as if you were presenting it to the Vector team. +- When possible, demonstrate with pseudo code not text. +- Be specific. Be opinionated. Avoid ambiguity. + +## Rationale + +- Why is this change worth it? +- What is the impact of not doing this? +- How does this position us for success in the future? + +## Drawbacks + +- Why should we not do this? +- What kind on ongoing burden does this place on the team? + +## Prior Art + +- List prior art, the good and bad. +- Why can't we simply use or copy them? + +## Alternatives + +- What other approaches have been considered and why did you not choose them? +- How about not doing this at all? + +## Outstanding Questions + +- List any remaining questions. +- Use this to resolve ambiguity and collaborate with your team during the RFC process. +- *These must be resolved before the RFC can be merged.* + +## Plan Of Attack + +Incremental steps to execute this change. These will be converted to issues after the RFC is approved: + +- [ ] Submit a PR with spike-level code _roughly_ demonstrating the change. +- [ ] Incremental change #1 +- [ ] Incremental change #2 +- [ ] ... + +Note: This can be filled out during the review process. + +## Future Improvements + +- List any future improvements. Use this to keep your "plan of attack" scope small and project a sound design. diff --git a/src/config/sink.rs b/src/config/sink.rs index c777525ebcb68..0addc95c5f4f7 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -9,7 +9,10 @@ use vector_core::{ }; use super::{id::Inputs, schema, ComponentKey, ProxyConfig, Resource}; -use crate::sinks::{util::UriSerde, Healthcheck, Sinks}; +use crate::{ + sinks::{util::UriSerde, Healthcheck, Sinks}, + topology::builder::SourceDetails, +}; /// Fully resolved sink component. #[configurable_component] @@ -211,6 +214,7 @@ pub struct SinkContext { pub globals: GlobalOptions, pub proxy: ProxyConfig, pub schema: schema::Options, + pub sources_details: Vec<SourceDetails>, } impl SinkContext { @@ -221,6 +225,7 @@ impl SinkContext { globals: GlobalOptions::default(), proxy: ProxyConfig::default(), schema: schema::Options::default(), + sources_details: vec![], } } @@ -231,4 +236,8 @@ impl SinkContext { pub const fn proxy(&self) -> &ProxyConfig { &self.proxy } + + pub fn source_details(&self, id: usize) -> Option<&SourceDetails> { + self.sources_details.get(id) + } } diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index 39c8d5a9458ba..17368475e8817 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -5,6 +5,7 @@ use futures::{Stream, StreamExt}; use metrics::{register_histogram, Histogram}; use value::Value; use vector_buffers::topology::channel::{self, LimitedReceiver, LimitedSender}; +use vector_common::config::ComponentKey; #[cfg(test)] use vector_core::event::{into_event_stream, EventStatus}; use vector_core::{ @@ -26,6 +27,7 @@ const TEST_BUFFER_SIZE: usize = 100; const LAG_TIME_NAME: &str = "source_lag_time_seconds"; pub struct Builder { + id: usize, buf_size: usize, inner: Option<Inner>, named_inners: HashMap<String, Inner>, @@ -38,6 +40,7 @@ impl Builder { pub fn with_buffer(self, n: usize) -> Self { Self { buf_size: n, + id: self.id, inner: self.inner, named_inners: self.named_inners, lag_time: self.lag_time, @@ -68,6 +71,7 @@ impl Builder { #[allow(clippy::missing_const_for_fn)] pub fn build(self) -> SourceSender { SourceSender { + id: self.id, inner: self.inner, named_inners: self.named_inners, } @@ -76,13 +80,15 @@ impl Builder { #[derive(Debug, Clone)] pub struct SourceSender { + id: usize, inner: Option<Inner>, named_inners: HashMap<String, Inner>, } impl SourceSender { - pub fn builder() -> Builder { + pub fn builder(id: usize) -> Builder { Builder { + id, buf_size: CHUNK_SIZE, inner: None, named_inners: Default::default(), @@ -90,11 +96,13 @@ impl SourceSender { } } - pub fn new_with_buffer(n: usize) -> (Self, LimitedReceiver<EventArray>) { + #[cfg(test)] + pub fn new_with_buffer(id: usize, n: usize) -> (Self, LimitedReceiver<EventArray>) { let lag_time = Some(register_histogram!(LAG_TIME_NAME)); let (inner, rx) = Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time); ( Self { + id, inner: Some(inner), named_inners: Default::default(), }, @@ -104,14 +112,14 @@ impl SourceSender { #[cfg(test)] pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_with_buffer(0, TEST_BUFFER_SIZE); let recv = recv.into_stream().flat_map(into_event_stream); (pipe, recv) } #[cfg(test)] pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_with_buffer(0, TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -130,7 +138,7 @@ impl SourceSender { pub fn new_test_errors( error_at: impl Fn(usize) -> bool, ) -> (Self, impl Stream<Item = Event> + Unpin) { - let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_with_buffer(0, TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -177,7 +185,7 @@ impl SourceSender { self.inner .as_mut() .expect("no default output") - .send_event(event) + .send_event(event, self.id) .await } @@ -252,11 +260,12 @@ impl Inner { ) } - async fn send(&mut self, events: EventArray) -> Result<(), ClosedError> { + async fn send(&mut self, events: EventArray, source_id: usize) -> Result<(), ClosedError> { let reference = Utc::now().timestamp_millis(); - events - .iter_events() - .for_each(|event| self.emit_lag_time(event, reference)); + events.iter_events_mut().for_each(|event| { + self.emit_lag_time(event, reference); + event.metadata_mut().set_source_id(source_id); + }); let byte_size = events.estimated_json_encoded_size_of(); let count = events.len(); self.inner.send(events).await.map_err(|_| ClosedError)?; @@ -268,8 +277,12 @@ impl Inner { Ok(()) } - async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> { - self.send(event.into()).await + async fn send_event( + &mut self, + event: impl Into<EventArray>, + source_id: usize, + ) -> Result<(), ClosedError> { + self.send(event.into(), source_id).await } async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError> diff --git a/src/topology/builder.rs b/src/topology/builder.rs index b3b0443abf343..e0bd96ae97cf2 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -135,6 +135,16 @@ pub struct Pieces { pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>, } +// #[derive(Debug, Clone, Default, Eq, PartialEq)] +// pub struct PipelineDetails { +// pub sources: Vec<SourceDetails>, +// } + +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct SourceDetails { + pub key: ComponentKey, +} + /// Builds only the new pieces, and doesn't check their topology. pub async fn build_pieces( config: &super::Config, @@ -151,6 +161,8 @@ pub async fn build_pieces( let mut errors = vec![]; + let mut sources_details = vec![]; + let (enrichment_tables, enrichment_errors) = load_enrichment_tables(config, diff).await; errors.extend(enrichment_errors); @@ -178,9 +190,13 @@ pub async fn build_pieces( key.id() ); + sources_details.push(SourceDetails { key: key.clone() }); + + let pipeline_source_id = sources_details.len() - 1; + let mut builder = { let _span = span.enter(); - SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE) + SourceSender::builder(pipeline_source_id).with_buffer(*SOURCE_SENDER_BUFFER_SIZE) }; let mut pumps = Vec::new(); let mut controls = HashMap::new(); @@ -439,6 +455,7 @@ pub async fn build_pieces( globals: config.global.clone(), proxy: ProxyConfig::merge_with_env(&config.global.proxy, sink.proxy()), schema: config.schema, + sources_details: sources_details.clone(), }; let (sink, healthcheck) = match sink.inner.build(cx).await { From 7aa35f76efd2dce7621a02a8b3728b2f5bd723f2 Mon Sep 17 00:00:00 2001 From: Jean Mertz <git@jeanmertz.com> Date: Fri, 2 Dec 2022 16:45:28 +0100 Subject: [PATCH 02/12] checkpoint Signed-off-by: Jean Mertz <git@jeanmertz.com> --- lib/vector-core/src/event/ref.rs | 9 +++++++++ src/source_sender/mod.rs | 7 +++---- src/sources/socket/mod.rs | 2 +- src/sources/statsd/mod.rs | 4 ++-- src/test_util/mock/mod.rs | 12 ++++++------ 5 files changed, 21 insertions(+), 13 deletions(-) diff --git a/lib/vector-core/src/event/ref.rs b/lib/vector-core/src/event/ref.rs index cc8b3c7512d25..8f7cff91655b4 100644 --- a/lib/vector-core/src/event/ref.rs +++ b/lib/vector-core/src/event/ref.rs @@ -118,6 +118,15 @@ pub enum EventMutRef<'a> { } impl<'a> EventMutRef<'a> { + /// Convert an `EventMutRef` to an `EventRef`. + pub fn as_event_ref(&'a self) -> EventRef<'a> { + match self { + EventMutRef::Log(v) => EventRef::Log(v), + EventMutRef::Metric(v) => EventRef::Metric(v), + EventMutRef::Trace(v) => EventRef::Trace(v), + } + } + /// Extract the `LogEvent` reference in this. /// /// # Panics diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index 17368475e8817..9b5283eb0dcdc 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -5,7 +5,6 @@ use futures::{Stream, StreamExt}; use metrics::{register_histogram, Histogram}; use value::Value; use vector_buffers::topology::channel::{self, LimitedReceiver, LimitedSender}; -use vector_common::config::ComponentKey; #[cfg(test)] use vector_core::event::{into_event_stream, EventStatus}; use vector_core::{ @@ -260,10 +259,10 @@ impl Inner { ) } - async fn send(&mut self, events: EventArray, source_id: usize) -> Result<(), ClosedError> { + async fn send(&mut self, mut events: EventArray, source_id: usize) -> Result<(), ClosedError> { let reference = Utc::now().timestamp_millis(); - events.iter_events_mut().for_each(|event| { - self.emit_lag_time(event, reference); + events.iter_events_mut().for_each(|mut event| { + self.emit_lag_time(event.as_event_ref(), reference); event.metadata_mut().set_source_id(source_id); }); let byte_size = events.estimated_json_encoded_size_of(); diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 42c8cd5741a86..9b4ca64261d8e 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -685,7 +685,7 @@ mod test { // shutdown. let addr = next_addr(); - let (source_tx, source_rx) = SourceSender::new_with_buffer(10_000); + let (source_tx, source_rx) = SourceSender::new_with_buffer(0, 10_000); let source_key = ComponentKey::from("tcp_shutdown_infinite_stream"); let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx); diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index 1bcc0fc12d056..0994740ab1c39 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -428,7 +428,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, rx) = SourceSender::new_with_buffer(4096); + let (tx, rx) = SourceSender::new_with_buffer(0, 4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) @@ -504,7 +504,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, _rx) = SourceSender::new_with_buffer(4096); + let (tx, _rx) = SourceSender::new_with_buffer(0, 4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) diff --git a/src/test_util/mock/mod.rs b/src/test_util/mock/mod.rs index 40c430e4e70a4..670e934fafeef 100644 --- a/src/test_util/mock/mod.rs +++ b/src/test_util/mock/mod.rs @@ -30,12 +30,12 @@ pub fn backpressure_source(counter: &Arc<AtomicUsize>) -> BackpressureSourceConf } pub fn basic_source() -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_with_buffer(0, 1); (tx, BasicSourceConfig::new(rx)) } pub fn basic_source_with_data(data: &str) -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_with_buffer(0, 1); (tx, BasicSourceConfig::new_with_data(rx, data)) } @@ -43,7 +43,7 @@ pub fn basic_source_with_event_counter( force_shutdown: bool, ) -> (SourceSender, BasicSourceConfig, Arc<AtomicUsize>) { let event_counter = Arc::new(AtomicUsize::new(0)); - let (tx, rx) = SourceSender::new_with_buffer(1); + let (tx, rx) = SourceSender::new_with_buffer(0, 1); let mut source = BasicSourceConfig::new_with_event_counter(rx, Arc::clone(&event_counter)); source.set_force_shutdown(force_shutdown); @@ -71,7 +71,7 @@ pub const fn backpressure_sink(num_to_consume: usize) -> BackpressureSinkConfig } pub fn basic_sink(channel_size: usize) -> (impl Stream<Item = EventArray>, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_with_buffer(0, channel_size); let sink = BasicSinkConfig::new(tx, true); (rx.into_stream(), sink) } @@ -80,7 +80,7 @@ pub fn basic_sink_with_data( channel_size: usize, data: &str, ) -> (impl Stream<Item = EventArray>, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_with_buffer(0, channel_size); let sink = BasicSinkConfig::new_with_data(tx, true, data); (rx.into_stream(), sink) } @@ -88,7 +88,7 @@ pub fn basic_sink_with_data( pub fn basic_sink_failing_healthcheck( channel_size: usize, ) -> (impl Stream<Item = EventArray>, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(channel_size); + let (tx, rx) = SourceSender::new_with_buffer(0, channel_size); let sink = BasicSinkConfig::new(tx, false); (rx.into_stream(), sink) } From ad13694ec1528050f545ce01395448afcf2f39f9 Mon Sep 17 00:00:00 2001 From: Jean Mertz <git@jeanmertz.com> Date: Thu, 8 Dec 2022 14:22:42 +0100 Subject: [PATCH 03/12] checkpoint Signed-off-by: Jean Mertz <git@jeanmertz.com> --- .../src/internal_event/events_sent.rs | 19 ++++--- lib/vector-core/src/event/metadata.rs | 14 +++-- lib/vector-core/src/stream/driver.rs | 1 + lib/vector-core/src/transform/mod.rs | 2 + src/sinks/blackhole/sink.rs | 1 + src/sinks/console/sink.rs | 1 + src/sinks/file/mod.rs | 28 +++++++--- src/sinks/nats.rs | 17 +++++-- src/sinks/prometheus/exporter.rs | 3 +- src/sinks/pulsar.rs | 1 + src/sinks/util/sink.rs | 3 +- src/sinks/websocket/config.rs | 4 +- src/sinks/websocket/sink.rs | 17 ++++++- src/source_sender/mod.rs | 51 ++++++++++--------- src/sources/socket/mod.rs | 2 +- src/sources/statsd/mod.rs | 4 +- src/test_util/mock/mod.rs | 12 ++--- src/topology/builder.rs | 5 +- 18 files changed, 120 insertions(+), 65 deletions(-) diff --git a/lib/vector-common/src/internal_event/events_sent.rs b/lib/vector-common/src/internal_event/events_sent.rs index ce1517af1a412..8b823b7028290 100644 --- a/lib/vector-common/src/internal_event/events_sent.rs +++ b/lib/vector-common/src/internal_event/events_sent.rs @@ -10,25 +10,28 @@ pub struct EventsSent<'a> { pub count: usize, pub byte_size: usize, pub output: Option<&'a str>, + pub source: Option<&'a str>, } impl<'a> InternalEvent for EventsSent<'a> { fn emit(self) { + let source = self.source.unwrap_or("UNKNOWN"); + if let Some(output) = self.output { - trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size, output = %output); + trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size, source = %source, output = %output); } else { - trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size); + trace!(message = "Events sent.", count = %self.count, byte_size = %self.byte_size, source = %source); } if self.count > 0 { if let Some(output) = self.output { - counter!("component_sent_events_total", self.count as u64, "output" => output.to_owned()); - counter!("events_out_total", self.count as u64, "output" => output.to_owned()); - counter!("component_sent_event_bytes_total", self.byte_size as u64, "output" => output.to_owned()); + counter!("component_sent_events_total", self.count as u64, "source" => source.to_owned(), "output" => output.to_owned()); + counter!("events_out_total", self.count as u64, "source" => source.to_owned(), "output" => output.to_owned()); + counter!("component_sent_event_bytes_total", self.byte_size as u64, "source" => source.to_owned(), "output" => output.to_owned()); } else { - counter!("component_sent_events_total", self.count as u64); - counter!("events_out_total", self.count as u64); - counter!("component_sent_event_bytes_total", self.byte_size as u64); + counter!("component_sent_events_total", self.count as u64, "source" => source.to_owned()); + counter!("events_out_total", self.count as u64, "source" => source.to_owned()); + counter!("component_sent_event_bytes_total", self.byte_size as u64, "source" => source.to_owned()); } } } diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index 2a51a0654e7ab..d658f3c03e34d 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -39,7 +39,10 @@ pub struct EventMetadata { /// A unique identifier of the originating source of this event. /// /// Can be used internally to refer back to source details such as its component key. - source_id: usize, + /// + /// If `None`, then the event has no originating source (e.g. it was created internally, such + /// as in the Lua or Remap transforms). + source_id: Option<usize>, } fn default_metadata_value() -> Value { @@ -103,7 +106,7 @@ impl Default for EventMetadata { secrets: Secrets::new(), finalizers: Default::default(), schema_definition: default_schema_definition(), - source_id: 0, // TODO(Jean): There's not a good default here. + source_id: None, } } } @@ -211,7 +214,12 @@ impl EventMetadata { /// set the source ID. pub fn set_source_id(&mut self, source_id: usize) { - self.source_id = source_id; + self.source_id = Some(source_id); + } + + /// Get the source ID. + pub fn source_id(&self) -> Option<usize> { + self.source_id } } diff --git a/lib/vector-core/src/stream/driver.rs b/lib/vector-core/src/stream/driver.rs index f1b1988582e42..5ca4d46636ddb 100644 --- a/lib/vector-core/src/stream/driver.rs +++ b/lib/vector-core/src/stream/driver.rs @@ -192,6 +192,7 @@ where count: cbs.0, byte_size: cbs.1, output: None, + source: None, }); // This condition occurs specifically when the `HttpBatchService::call()` is called *within* the `Service::call()` diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index 142087cd94493..19624db731547 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -281,6 +281,7 @@ impl TransformOutputs { count, byte_size, output: Some(DEFAULT_OUTPUT), + source: None, }); } @@ -293,6 +294,7 @@ impl TransformOutputs { count, byte_size, output: Some(key.as_ref()), + source: None, }); } diff --git a/src/sinks/blackhole/sink.rs b/src/sinks/blackhole/sink.rs index ba6d8b8b62d67..12e5aa21b7984 100644 --- a/src/sinks/blackhole/sink.rs +++ b/src/sinks/blackhole/sink.rs @@ -96,6 +96,7 @@ impl StreamSink<EventArray> for BlackholeSink { count: events.len(), byte_size: message_len, output: None, + source: None, }); emit!(BytesSent { diff --git a/src/sinks/console/sink.rs b/src/sinks/console/sink.rs index d303131fcf2fe..38214c4c11bde 100644 --- a/src/sinks/console/sink.rs +++ b/src/sinks/console/sink.rs @@ -54,6 +54,7 @@ where byte_size: event_byte_size, count: 1, output: None, + source: None, }); bytes_sent.emit(ByteSize(bytes.len())); } diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index f0e21ada44f75..36fe4eb57e3be 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -28,6 +28,7 @@ use crate::{ internal_events::{FileBytesSent, FileIoError, FileOpen, TemplateRenderingError}, sinks::util::StreamSink, template::Template, + topology::builder::SourceDetails, }; mod bytes_path; use std::convert::TryFrom; @@ -152,9 +153,9 @@ impl OutFile { impl SinkConfig for FileSinkConfig { async fn build( &self, - _cx: SinkContext, + ctx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let sink = FileSink::new(self)?; + let sink = FileSink::new(self, ctx.sources_details.clone())?; Ok(( super::VectorSink::from_event_streamsink(sink), future::ok(()).boxed(), @@ -178,10 +179,14 @@ pub struct FileSink { idle_timeout: Duration, files: ExpiringHashMap<Bytes, OutFile>, compression: Compression, + sources_details: Vec<SourceDetails>, } impl FileSink { - pub fn new(config: &FileSinkConfig) -> crate::Result<Self> { + pub fn new( + config: &FileSinkConfig, + sources_details: Vec<SourceDetails>, + ) -> crate::Result<Self> { let transformer = config.encoding.transformer(); let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::<Framer>::new(framer, serializer); @@ -193,6 +198,7 @@ impl FileSink { idle_timeout: Duration::from_secs(config.idle_timeout_secs.unwrap_or(30)), files: ExpiringHashMap::default(), compression: config.compression, + sources_details, }) } @@ -335,6 +341,11 @@ impl FileSink { trace!(message = "Writing an event to file.", path = ?path); let event_size = event.estimated_json_encoded_size_of(); let finalizers = event.take_finalizers(); + let source = event + .metadata() + .source_id() + .and_then(|id| self.sources_details.get(id).map(|details| details.key.id())); + match write_event_to_file(file, event, &self.transformer, &mut self.encoder).await { Ok(byte_size) => { finalizers.update_status(EventStatus::Delivered); @@ -342,6 +353,7 @@ impl FileSink { count: 1, byte_size: event_size, output: None, + source, }); emit!(FileBytesSent { byte_size, @@ -439,7 +451,7 @@ mod tests { acknowledgements: Default::default(), }; - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, vec![]).unwrap(); let (input, _events) = random_lines_with_stream(100, 64, None); let events = Box::pin(stream::iter( @@ -475,7 +487,7 @@ mod tests { acknowledgements: Default::default(), }; - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, vec![]).unwrap(); let (input, _) = random_lines_with_stream(100, 64, None); let events = Box::pin(stream::iter( @@ -511,7 +523,7 @@ mod tests { acknowledgements: Default::default(), }; - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, vec![]).unwrap(); let (input, _) = random_lines_with_stream(100, 64, None); let events = Box::pin(stream::iter( @@ -552,7 +564,7 @@ mod tests { acknowledgements: Default::default(), }; - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, vec![]).unwrap(); let (mut input, _events) = random_events_with_stream(32, 8, None); input[0].as_mut_log().insert("date", "2019-26-07"); @@ -637,7 +649,7 @@ mod tests { acknowledgements: Default::default(), }; - let sink = FileSink::new(&config).unwrap(); + let sink = FileSink::new(&config, vec![]).unwrap(); let (mut input, _events) = random_lines_with_stream(10, 64, None); let (mut tx, rx) = futures::channel::mpsc::channel(0); diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index 307e9c80040e5..8526f11ab1bad 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -20,6 +20,7 @@ use crate::{ sinks::util::StreamSink, template::{Template, TemplateParseError}, tls::TlsEnableableConfig, + topology::builder::SourceDetails, }; #[derive(Debug, Snafu)] @@ -99,9 +100,9 @@ impl GenerateConfig for NatsSinkConfig { impl SinkConfig for NatsSinkConfig { async fn build( &self, - _cx: SinkContext, + ctx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { - let sink = NatsSink::new(self.clone()).await?; + let sink = NatsSink::new(self.clone(), ctx).await?; let healthcheck = healthcheck(self.clone()).boxed(); Ok((super::VectorSink::from_event_streamsink(sink), healthcheck)) } @@ -140,20 +141,23 @@ pub struct NatsSink { encoder: Encoder<()>, connection: nats::asynk::Connection, subject: Template, + sources_details: Vec<SourceDetails>, } impl NatsSink { - async fn new(config: NatsSinkConfig) -> Result<Self, BuildError> { + async fn new(config: NatsSinkConfig, ctx: SinkContext) -> Result<Self, BuildError> { let connection = config.connect().await?; let transformer = config.encoding.transformer(); let serializer = config.encoding.build().context(EncodingSnafu)?; let encoder = Encoder::<()>::new(serializer); + let sources_details = ctx.sources_details.clone(); Ok(NatsSink { connection, transformer, encoder, subject: Template::try_from(config.subject).context(SubjectTemplateSnafu)?, + sources_details, }) } } @@ -182,6 +186,10 @@ impl StreamSink<Event> for NatsSink { self.transformer.transform(&mut event); let event_byte_size = event.estimated_json_encoded_size_of(); + let source = event + .metadata() + .source_id() + .and_then(|id| self.sources_details.get(id).map(|details| details.key.id())); let mut bytes = BytesMut::new(); if self.encoder.encode(event, &mut bytes).is_err() { @@ -202,7 +210,8 @@ impl StreamSink<Event> for NatsSink { emit!(EventsSent { byte_size: event_byte_size, count: 1, - output: None + output: None, + source, }); bytes_sent.emit(ByteSize(bytes.len())); } diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 078a948e3b70e..1a8c5706240c8 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -422,7 +422,8 @@ fn handle( emit!(EventsSent { count, byte_size, - output: None + output: None, + source: None, }); } diff --git a/src/sinks/pulsar.rs b/src/sinks/pulsar.rs index 33357acf4e1f8..0ab042bfd5dfd 100644 --- a/src/sinks/pulsar.rs +++ b/src/sinks/pulsar.rs @@ -373,6 +373,7 @@ impl Sink<Event> for PulsarSink { count: metadata.event_count(), byte_size: metadata.events_estimated_json_encoded_byte_size(), output: None, + source: None, }); this.bytes_sent diff --git a/src/sinks/util/sink.rs b/src/sinks/util/sink.rs index 17be9f59f1d7b..18f586de6b181 100644 --- a/src/sinks/util/sink.rs +++ b/src/sinks/util/sink.rs @@ -451,7 +451,8 @@ where emit!(EventsSent { count, byte_size, - output: None + output: None, + source: None, }); // TODO: Emit a BytesSent event here too } diff --git a/src/sinks/websocket/config.rs b/src/sinks/websocket/config.rs index c54aa1d7880e6..21f4e0dd8d189 100644 --- a/src/sinks/websocket/config.rs +++ b/src/sinks/websocket/config.rs @@ -65,9 +65,9 @@ impl GenerateConfig for WebSocketSinkConfig { #[async_trait::async_trait] impl SinkConfig for WebSocketSinkConfig { - async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + async fn build(&self, ctx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let connector = self.build_connector()?; - let ws_sink = WebSocketSink::new(self, connector.clone())?; + let ws_sink = WebSocketSink::new(self, connector.clone(), ctx)?; Ok(( VectorSink::from_event_streamsink(ws_sink), diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index 9361ac457b462..c85ff2aee11e2 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -30,6 +30,7 @@ use vector_core::{ use crate::{ codecs::{Encoder, Transformer}, + config::SinkContext, dns, emit, event::{Event, EventStatus, Finalizable}, http::Auth, @@ -40,6 +41,7 @@ use crate::{ sinks::util::{retries::ExponentialBackoff, StreamSink}, sinks::websocket::config::WebSocketSinkConfig, tls::{MaybeTlsSettings, MaybeTlsStream, TlsError}, + topology::builder::SourceDetails, }; #[derive(Debug, Snafu)] @@ -193,10 +195,15 @@ pub struct WebSocketSink { connector: WebSocketConnector, ping_interval: Option<u64>, ping_timeout: Option<u64>, + sources_details: Vec<SourceDetails>, } impl WebSocketSink { - pub fn new(config: &WebSocketSinkConfig, connector: WebSocketConnector) -> crate::Result<Self> { + pub fn new( + config: &WebSocketSinkConfig, + connector: WebSocketConnector, + ctx: SinkContext, + ) -> crate::Result<Self> { let transformer = config.encoding.transformer(); let serializer = config.encoding.build()?; let encoder = Encoder::<()>::new(serializer); @@ -207,6 +214,7 @@ impl WebSocketSink { connector, ping_interval: config.ping_interval.filter(|v| *v > 0), ping_timeout: config.ping_timeout.filter(|v| *v > 0), + sources_details: ctx.sources_details.clone(), }) } @@ -289,6 +297,10 @@ impl WebSocketSink { self.transformer.transform(&mut event); let event_byte_size = event.estimated_json_encoded_size_of(); + let source = event.metadata().source_id().and_then(|id| self + .sources_details + .get(id) + .map(|details| details.key.id())); let mut bytes = BytesMut::new(); let res = match self.encoder.encode(event, &mut bytes) { @@ -302,7 +314,8 @@ impl WebSocketSink { emit!(EventsSent { count: 1, byte_size: event_byte_size, - output: None + output: None, + source, }); bytes_sent.emit(ByteSize(message_len)); }) diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index 9b5283eb0dcdc..3191cc870fd4f 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -26,7 +26,6 @@ const TEST_BUFFER_SIZE: usize = 100; const LAG_TIME_NAME: &str = "source_lag_time_seconds"; pub struct Builder { - id: usize, buf_size: usize, inner: Option<Inner>, named_inners: HashMap<String, Inner>, @@ -39,27 +38,31 @@ impl Builder { pub fn with_buffer(self, n: usize) -> Self { Self { buf_size: n, - id: self.id, inner: self.inner, named_inners: self.named_inners, lag_time: self.lag_time, } } - pub fn add_output(&mut self, output: Output) -> LimitedReceiver<EventArray> { + pub fn add_output(&mut self, output: Output, source_id: usize) -> LimitedReceiver<EventArray> { match output.port { None => { let (inner, rx) = Inner::new_with_buffer( self.buf_size, DEFAULT_OUTPUT.to_owned(), self.lag_time.clone(), + source_id, ); self.inner = Some(inner); rx } Some(name) => { - let (inner, rx) = - Inner::new_with_buffer(self.buf_size, name.clone(), self.lag_time.clone()); + let (inner, rx) = Inner::new_with_buffer( + self.buf_size, + name.clone(), + self.lag_time.clone(), + source_id, + ); self.named_inners.insert(name, inner); rx } @@ -70,7 +73,6 @@ impl Builder { #[allow(clippy::missing_const_for_fn)] pub fn build(self) -> SourceSender { SourceSender { - id: self.id, inner: self.inner, named_inners: self.named_inners, } @@ -79,15 +81,13 @@ impl Builder { #[derive(Debug, Clone)] pub struct SourceSender { - id: usize, inner: Option<Inner>, named_inners: HashMap<String, Inner>, } impl SourceSender { - pub fn builder(id: usize) -> Builder { + pub fn builder() -> Builder { Builder { - id, buf_size: CHUNK_SIZE, inner: None, named_inners: Default::default(), @@ -96,12 +96,11 @@ impl SourceSender { } #[cfg(test)] - pub fn new_with_buffer(id: usize, n: usize) -> (Self, LimitedReceiver<EventArray>) { + pub fn new_with_buffer(n: usize) -> (Self, LimitedReceiver<EventArray>) { let lag_time = Some(register_histogram!(LAG_TIME_NAME)); - let (inner, rx) = Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time); + let (inner, rx) = Inner::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, 0); ( Self { - id, inner: Some(inner), named_inners: Default::default(), }, @@ -111,14 +110,14 @@ impl SourceSender { #[cfg(test)] pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) { - let (pipe, recv) = Self::new_with_buffer(0, TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); let recv = recv.into_stream().flat_map(into_event_stream); (pipe, recv) } #[cfg(test)] pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) { - let (pipe, recv) = Self::new_with_buffer(0, TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -137,7 +136,7 @@ impl SourceSender { pub fn new_test_errors( error_at: impl Fn(usize) -> bool, ) -> (Self, impl Stream<Item = Event> + Unpin) { - let (pipe, recv) = Self::new_with_buffer(0, TEST_BUFFER_SIZE); + let (pipe, recv) = Self::new_with_buffer(TEST_BUFFER_SIZE); // In a source test pipeline, there is no sink to acknowledge // events, so we have to add a map to the receiver to handle the // finalization. @@ -167,7 +166,7 @@ impl SourceSender { ) -> impl Stream<Item = EventArray> + Unpin { // The lag_time parameter here will need to be filled in if this function is ever used for // non-test situations. - let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None); + let (inner, recv) = Inner::new_with_buffer(100, name.clone(), None, 0); let recv = recv.into_stream().map(move |mut events| { events.iter_events_mut().for_each(|mut event| { let metadata = event.metadata_mut(); @@ -184,7 +183,7 @@ impl SourceSender { self.inner .as_mut() .expect("no default output") - .send_event(event, self.id) + .send_event(event) .await } @@ -227,6 +226,7 @@ impl SourceSender { #[derive(Clone)] struct Inner { + source_id: usize, inner: LimitedSender<EventArray>, output: String, lag_time: Option<Histogram>, @@ -247,10 +247,12 @@ impl Inner { n: usize, output: String, lag_time: Option<Histogram>, + source_id: usize, ) -> (Self, LimitedReceiver<EventArray>) { let (tx, rx) = channel::limited(n); ( Self { + source_id, inner: tx, output, lag_time, @@ -259,11 +261,11 @@ impl Inner { ) } - async fn send(&mut self, mut events: EventArray, source_id: usize) -> Result<(), ClosedError> { + async fn send(&mut self, mut events: EventArray) -> Result<(), ClosedError> { let reference = Utc::now().timestamp_millis(); events.iter_events_mut().for_each(|mut event| { self.emit_lag_time(event.as_event_ref(), reference); - event.metadata_mut().set_source_id(source_id); + event.metadata_mut().set_source_id(self.source_id); }); let byte_size = events.estimated_json_encoded_size_of(); let count = events.len(); @@ -272,16 +274,13 @@ impl Inner { count, byte_size, output: Some(self.output.as_ref()), + source: None, }); Ok(()) } - async fn send_event( - &mut self, - event: impl Into<EventArray>, - source_id: usize, - ) -> Result<(), ClosedError> { - self.send(event.into(), source_id).await + async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> { + self.send(event.into()).await } async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError> @@ -322,6 +321,7 @@ impl Inner { count, byte_size, output: Some(self.output.as_ref()), + source: None, }); return Err(error.into()); } @@ -332,6 +332,7 @@ impl Inner { count, byte_size, output: Some(self.output.as_ref()), + source: None, }); Ok(()) diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 9b4ca64261d8e..42c8cd5741a86 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -685,7 +685,7 @@ mod test { // shutdown. let addr = next_addr(); - let (source_tx, source_rx) = SourceSender::new_with_buffer(0, 10_000); + let (source_tx, source_rx) = SourceSender::new_with_buffer(10_000); let source_key = ComponentKey::from("tcp_shutdown_infinite_stream"); let (source_cx, mut shutdown) = SourceContext::new_shutdown(&source_key, source_tx); diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index 0994740ab1c39..1bcc0fc12d056 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -428,7 +428,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, rx) = SourceSender::new_with_buffer(0, 4096); + let (tx, rx) = SourceSender::new_with_buffer(4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) @@ -504,7 +504,7 @@ mod test { // packet we send has a lot of metrics per packet. We could technically count them all up // and have a more accurate number here, but honestly, who cares? This is big enough. let component_key = ComponentKey::from("statsd"); - let (tx, _rx) = SourceSender::new_with_buffer(0, 4096); + let (tx, _rx) = SourceSender::new_with_buffer(4096); let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx); let sink = statsd_config .build(source_ctx) diff --git a/src/test_util/mock/mod.rs b/src/test_util/mock/mod.rs index 670e934fafeef..40c430e4e70a4 100644 --- a/src/test_util/mock/mod.rs +++ b/src/test_util/mock/mod.rs @@ -30,12 +30,12 @@ pub fn backpressure_source(counter: &Arc<AtomicUsize>) -> BackpressureSourceConf } pub fn basic_source() -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(0, 1); + let (tx, rx) = SourceSender::new_with_buffer(1); (tx, BasicSourceConfig::new(rx)) } pub fn basic_source_with_data(data: &str) -> (SourceSender, BasicSourceConfig) { - let (tx, rx) = SourceSender::new_with_buffer(0, 1); + let (tx, rx) = SourceSender::new_with_buffer(1); (tx, BasicSourceConfig::new_with_data(rx, data)) } @@ -43,7 +43,7 @@ pub fn basic_source_with_event_counter( force_shutdown: bool, ) -> (SourceSender, BasicSourceConfig, Arc<AtomicUsize>) { let event_counter = Arc::new(AtomicUsize::new(0)); - let (tx, rx) = SourceSender::new_with_buffer(0, 1); + let (tx, rx) = SourceSender::new_with_buffer(1); let mut source = BasicSourceConfig::new_with_event_counter(rx, Arc::clone(&event_counter)); source.set_force_shutdown(force_shutdown); @@ -71,7 +71,7 @@ pub const fn backpressure_sink(num_to_consume: usize) -> BackpressureSinkConfig } pub fn basic_sink(channel_size: usize) -> (impl Stream<Item = EventArray>, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(0, channel_size); + let (tx, rx) = SourceSender::new_with_buffer(channel_size); let sink = BasicSinkConfig::new(tx, true); (rx.into_stream(), sink) } @@ -80,7 +80,7 @@ pub fn basic_sink_with_data( channel_size: usize, data: &str, ) -> (impl Stream<Item = EventArray>, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(0, channel_size); + let (tx, rx) = SourceSender::new_with_buffer(channel_size); let sink = BasicSinkConfig::new_with_data(tx, true, data); (rx.into_stream(), sink) } @@ -88,7 +88,7 @@ pub fn basic_sink_with_data( pub fn basic_sink_failing_healthcheck( channel_size: usize, ) -> (impl Stream<Item = EventArray>, BasicSinkConfig) { - let (tx, rx) = SourceSender::new_with_buffer(0, channel_size); + let (tx, rx) = SourceSender::new_with_buffer(channel_size); let sink = BasicSinkConfig::new(tx, false); (rx.into_stream(), sink) } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 696a39e47338c..8ed29565f3bd3 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -197,14 +197,14 @@ pub async fn build_pieces( let mut builder = { let _span = span.enter(); - SourceSender::builder(pipeline_source_id).with_buffer(*SOURCE_SENDER_BUFFER_SIZE) + SourceSender::builder().with_buffer(*SOURCE_SENDER_BUFFER_SIZE) }; let mut pumps = Vec::new(); let mut controls = HashMap::new(); let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); for output in source_outputs { - let mut rx = builder.add_output(output.clone()); + let mut rx = builder.add_output(output.clone(), pipeline_source_id); let (mut fanout, control) = Fanout::new(); let pump = async move { @@ -859,6 +859,7 @@ fn build_task_transform( count: events.len(), byte_size: events.estimated_json_encoded_size_of(), output: None, + source: None, }); }); let transform = async move { From f460c59833265dcc0b63f90137064a6937aecc35 Mon Sep 17 00:00:00 2001 From: Jean Mertz <git@jeanmertz.com> Date: Thu, 8 Dec 2022 14:25:03 +0100 Subject: [PATCH 04/12] fixup! checkpoint Signed-off-by: Jean Mertz <git@jeanmertz.com> --- rfcs/2022-11-03-XXX-data-volume-insights.md | 81 --------------------- 1 file changed, 81 deletions(-) delete mode 100644 rfcs/2022-11-03-XXX-data-volume-insights.md diff --git a/rfcs/2022-11-03-XXX-data-volume-insights.md b/rfcs/2022-11-03-XXX-data-volume-insights.md deleted file mode 100644 index 0629f626a591d..0000000000000 --- a/rfcs/2022-11-03-XXX-data-volume-insights.md +++ /dev/null @@ -1,81 +0,0 @@ -# RFC <issue#> - <YYYY-MM-DD> - <title> - -One paragraph description of the change. - -## Context - -- Link to any previous issues, RFCs, or briefs (do not repeat that context in this RFC). - -## Cross cutting concerns - -- Link to any ongoing or future work relevant to this change. - -## Scope - -### In scope - -- List work being directly addressed with this RFC. - -### Out of scope - -- List work that is completely out of scope. Use this to keep discussions focused. Please note the "future changes" section at the bottom. - -## Pain - -- What internal or external *pain* are we solving? -- Do not cover benefits of your change, this is covered in the "Rationale" section. - -## Proposal - -### User Experience - -- Explain your change as if you were describing it to a Vector user. We should be able to share this section with a Vector user to solicit feedback. -- Does this change break backward compatibility? If so, what should users do to upgrade? - -### Implementation - -- Explain your change as if you were presenting it to the Vector team. -- When possible, demonstrate with pseudo code not text. -- Be specific. Be opinionated. Avoid ambiguity. - -## Rationale - -- Why is this change worth it? -- What is the impact of not doing this? -- How does this position us for success in the future? - -## Drawbacks - -- Why should we not do this? -- What kind on ongoing burden does this place on the team? - -## Prior Art - -- List prior art, the good and bad. -- Why can't we simply use or copy them? - -## Alternatives - -- What other approaches have been considered and why did you not choose them? -- How about not doing this at all? - -## Outstanding Questions - -- List any remaining questions. -- Use this to resolve ambiguity and collaborate with your team during the RFC process. -- *These must be resolved before the RFC can be merged.* - -## Plan Of Attack - -Incremental steps to execute this change. These will be converted to issues after the RFC is approved: - -- [ ] Submit a PR with spike-level code _roughly_ demonstrating the change. -- [ ] Incremental change #1 -- [ ] Incremental change #2 -- [ ] ... - -Note: This can be filled out during the review process. - -## Future Improvements - -- List any future improvements. Use this to keep your "plan of attack" scope small and project a sound design. From efa1c90aba8acebe5fca9590ad9583718382a9d3 Mon Sep 17 00:00:00 2001 From: Jean Mertz <git@jeanmertz.com> Date: Tue, 13 Dec 2022 16:14:48 +0100 Subject: [PATCH 05/12] checkpoint Signed-off-by: Jean Mertz <git@jeanmertz.com> --- lib/vector-core/src/event/ref.rs | 9 ++++ lib/vector-core/src/transform/mod.rs | 70 +++++++++++++++++++--------- src/source_sender/mod.rs | 19 ++++++-- src/topology/builder.rs | 3 +- 4 files changed, 74 insertions(+), 27 deletions(-) diff --git a/lib/vector-core/src/event/ref.rs b/lib/vector-core/src/event/ref.rs index 8f7cff91655b4..32c3cd50b1591 100644 --- a/lib/vector-core/src/event/ref.rs +++ b/lib/vector-core/src/event/ref.rs @@ -64,6 +64,15 @@ impl<'a> EventRef<'a> { _ => panic!("Failed type coercion, {:?} is not a metric reference", self), } } + + /// Access the metadata in this reference. + pub fn metadata(&self) -> &EventMetadata { + match self { + Self::Log(event) => event.metadata(), + Self::Metric(event) => event.metadata(), + Self::Trace(event) => event.metadata(), + } + } } impl<'a> From<&'a Event> for EventRef<'a> { diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index 19624db731547..00a14bb89b147 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -12,6 +12,7 @@ use crate::{ into_event_stream, EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventRef, }, fanout::{self, Fanout}, + topology::builder::SourceDetails, ByteSizeOf, }; @@ -220,10 +221,14 @@ pub struct TransformOutputs { outputs_spec: Vec<Output>, primary_output: Option<Fanout>, named_outputs: HashMap<String, Fanout>, + sources_details: Vec<SourceDetails>, } impl TransformOutputs { - pub fn new(outputs_in: Vec<Output>) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) { + pub fn new( + outputs_in: Vec<Output>, + sources_details: Vec<SourceDetails>, + ) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) { let outputs_spec = outputs_in.clone(); let mut primary_output = None; let mut named_outputs = HashMap::new(); @@ -247,6 +252,7 @@ impl TransformOutputs { outputs_spec, primary_output, named_outputs, + sources_details, }; (me, controls) @@ -267,22 +273,31 @@ impl TransformOutputs { buf: &mut TransformOutputsBuf, ) -> Result<(), Box<dyn error::Error + Send + Sync>> { if let Some(primary) = self.primary_output.as_mut() { - let count = buf.primary_buffer.as_ref().map_or(0, OutputBuffer::len); - let byte_size = buf.primary_buffer.as_ref().map_or( - 0, - EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of, - ); - buf.primary_buffer - .as_mut() - .expect("mismatched outputs") - .send(primary) - .await?; - emit(EventsSent { - count, - byte_size, - output: Some(DEFAULT_OUTPUT), - source: None, + let primary_buffer = buf.primary_buffer.as_mut().expect("mismatched outputs"); + + let count = primary_buffer.len(); + let byte_size = primary_buffer.estimated_json_encoded_size_of(); + + primary_buffer.send(primary).await?; + + let mut sources: HashMap<Option<usize>, usize> = HashMap::new(); + primary_buffer.iter_events().for_each(|event| { + sources + .entry(event.metadata().source_id()) + .and_modify(|i| *i += 1) + .or_insert(1); }); + + for (source_id, count) in sources { + emit(EventsSent { + count, + byte_size, + output: Some(DEFAULT_OUTPUT), + source: source_id.and_then(|id| { + self.sources_details.get(id).map(|details| details.key.id()) + }), + }); + } } for (key, buf) in &mut buf.named_buffers { @@ -290,12 +305,25 @@ impl TransformOutputs { let byte_size = buf.estimated_json_encoded_size_of(); buf.send(self.named_outputs.get_mut(key).expect("unknown output")) .await?; - emit(EventsSent { - count, - byte_size, - output: Some(key.as_ref()), - source: None, + + let mut sources: HashMap<Option<usize>, usize> = HashMap::new(); + buf.iter_events().for_each(|event| { + sources + .entry(event.metadata().source_id()) + .and_modify(|i| *i += 1) + .or_insert(1); }); + + for (source_id, count) in sources { + emit(EventsSent { + count, + byte_size, + output: Some(key.as_ref()), + source: source_id.and_then(|id| { + self.sources_details.get(id).map(|details| details.key.id()) + }), + }); + } } Ok(()) diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index 3191cc870fd4f..3e76befec4709 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -44,7 +44,11 @@ impl Builder { } } - pub fn add_output(&mut self, output: Output, source_id: usize) -> LimitedReceiver<EventArray> { + pub fn add_output( + &mut self, + output: Output, + source_id: (usize, String), + ) -> LimitedReceiver<EventArray> { match output.port { None => { let (inner, rx) = Inner::new_with_buffer( @@ -226,10 +230,15 @@ impl SourceSender { #[derive(Clone)] struct Inner { - source_id: usize, inner: LimitedSender<EventArray>, output: String, lag_time: Option<Histogram>, + + /// The unique ID of the source to which this sender belongs. + /// + /// - The `usize` is the ID attached to individual event metadata. + /// - The `String` is used in the `EventsSent` internal event triggered by the sender. + source_id: (usize, String), } impl fmt::Debug for Inner { @@ -247,7 +256,7 @@ impl Inner { n: usize, output: String, lag_time: Option<Histogram>, - source_id: usize, + source_id: (usize, String), ) -> (Self, LimitedReceiver<EventArray>) { let (tx, rx) = channel::limited(n); ( @@ -265,7 +274,7 @@ impl Inner { let reference = Utc::now().timestamp_millis(); events.iter_events_mut().for_each(|mut event| { self.emit_lag_time(event.as_event_ref(), reference); - event.metadata_mut().set_source_id(self.source_id); + event.metadata_mut().set_source_id(self.source_id.0); }); let byte_size = events.estimated_json_encoded_size_of(); let count = events.len(); @@ -274,7 +283,7 @@ impl Inner { count, byte_size, output: Some(self.output.as_ref()), - source: None, + source: Some(&self.source_id.1), }); Ok(()) } diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 8ed29565f3bd3..f209a8346a6ed 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -204,7 +204,8 @@ pub async fn build_pieces( let mut schema_definitions = HashMap::with_capacity(source_outputs.len()); for output in source_outputs { - let mut rx = builder.add_output(output.clone(), pipeline_source_id); + let mut rx = + builder.add_output(output.clone(), (pipeline_source_id, key.id().to_owned())); let (mut fanout, control) = Fanout::new(); let pump = async move { From 8520d6dd3a7f6d0783a9121b79a3b1d9340080bf Mon Sep 17 00:00:00 2001 From: Jean Mertz <git@jeanmertz.com> Date: Tue, 20 Dec 2022 12:32:14 +0100 Subject: [PATCH 06/12] checkpoint Signed-off-by: Jean Mertz <git@jeanmertz.com> --- lib/vector-common/src/config.rs | 5 +++ lib/vector-core/src/event/ref.rs | 12 ++++++ lib/vector-core/src/transform/mod.rs | 33 +++++++++------- src/config/sink.rs | 2 +- src/topology/builder.rs | 56 +++++++++++++++++----------- 5 files changed, 72 insertions(+), 36 deletions(-) diff --git a/lib/vector-common/src/config.rs b/lib/vector-common/src/config.rs index 1c3f568d00880..8143332b260d3 100644 --- a/lib/vector-common/src/config.rs +++ b/lib/vector-common/src/config.rs @@ -80,6 +80,11 @@ impl PartialOrd for ComponentKey { impl ConfigurableString for ComponentKey {} +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct SourceDetails { + pub key: ComponentKey, +} + #[cfg(test)] mod tests { use super::*; diff --git a/lib/vector-core/src/event/ref.rs b/lib/vector-core/src/event/ref.rs index 32c3cd50b1591..85324d5d82adf 100644 --- a/lib/vector-core/src/event/ref.rs +++ b/lib/vector-core/src/event/ref.rs @@ -2,6 +2,8 @@ use vector_common::EventDataEq; +use crate::EstimatedJsonEncodedSizeOf; + use super::{Event, EventMetadata, LogEvent, Metric, TraceEvent}; /// A wrapper for references to inner event types, where reconstituting @@ -114,6 +116,16 @@ impl<'a> EventDataEq<Event> for EventRef<'a> { } } +impl<'a> EstimatedJsonEncodedSizeOf for EventRef<'a> { + fn estimated_json_encoded_size_of(&self) -> usize { + match self { + EventRef::Log(v) => v.estimated_json_encoded_size_of(), + EventRef::Metric(v) => v.estimated_json_encoded_size_of(), + EventRef::Trace(v) => v.estimated_json_encoded_size_of(), + } + } +} + /// A wrapper for mutable references to inner event types, where reconstituting /// a full `Event` from a `LogEvent` or `Metric` might be inconvenient. #[derive(Debug)] diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index 00a14bb89b147..ac66adfbb37b4 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -2,6 +2,7 @@ use std::{collections::HashMap, error, pin::Pin}; use futures::{Stream, StreamExt}; use vector_common::{ + config::SourceDetails, internal_event::{emit, EventsSent, DEFAULT_OUTPUT}, EventDataEq, }; @@ -12,7 +13,6 @@ use crate::{ into_event_stream, EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventRef, }, fanout::{self, Fanout}, - topology::builder::SourceDetails, ByteSizeOf, }; @@ -275,20 +275,22 @@ impl TransformOutputs { if let Some(primary) = self.primary_output.as_mut() { let primary_buffer = buf.primary_buffer.as_mut().expect("mismatched outputs"); - let count = primary_buffer.len(); - let byte_size = primary_buffer.estimated_json_encoded_size_of(); - primary_buffer.send(primary).await?; - let mut sources: HashMap<Option<usize>, usize> = HashMap::new(); + let mut sources: HashMap<Option<usize>, (usize, usize)> = HashMap::new(); primary_buffer.iter_events().for_each(|event| { + let size = event.estimated_json_encoded_size_of(); + sources .entry(event.metadata().source_id()) - .and_modify(|i| *i += 1) - .or_insert(1); + .and_modify(|(count, byte_size)| { + *count += 1; + *byte_size += size; + }) + .or_insert((1, size)); }); - for (source_id, count) in sources { + for (source_id, (count, byte_size)) in sources { emit(EventsSent { count, byte_size, @@ -301,20 +303,23 @@ impl TransformOutputs { } for (key, buf) in &mut buf.named_buffers { - let count = buf.len(); - let byte_size = buf.estimated_json_encoded_size_of(); buf.send(self.named_outputs.get_mut(key).expect("unknown output")) .await?; - let mut sources: HashMap<Option<usize>, usize> = HashMap::new(); + let mut sources: HashMap<Option<usize>, (usize, usize)> = HashMap::new(); buf.iter_events().for_each(|event| { + let size = event.estimated_json_encoded_size_of(); + sources .entry(event.metadata().source_id()) - .and_modify(|i| *i += 1) - .or_insert(1); + .and_modify(|(count, byte_size)| { + *count += 1; + *byte_size += size; + }) + .or_insert((1, size)); }); - for (source_id, count) in sources { + for (source_id, (count, byte_size)) in sources { emit(EventsSent { count, byte_size, diff --git a/src/config/sink.rs b/src/config/sink.rs index 0addc95c5f4f7..e10e6739ce22f 100644 --- a/src/config/sink.rs +++ b/src/config/sink.rs @@ -2,6 +2,7 @@ use async_trait::async_trait; use enum_dispatch::enum_dispatch; use serde::Serialize; use vector_buffers::{BufferConfig, BufferType}; +use vector_common::config::SourceDetails; use vector_config::{configurable_component, Configurable, NamedComponent}; use vector_core::{ config::{AcknowledgementsConfig, GlobalOptions, Input}, @@ -11,7 +12,6 @@ use vector_core::{ use super::{id::Inputs, schema, ComponentKey, ProxyConfig, Resource}; use crate::{ sinks::{util::UriSerde, Healthcheck, Sinks}, - topology::builder::SourceDetails, }; /// Fully resolved sink component. diff --git a/src/topology/builder.rs b/src/topology/builder.rs index f209a8346a6ed..91903dcdbf5de 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -16,6 +16,7 @@ use tokio::{ time::{timeout, Duration}, }; use tracing::Instrument; +use vector_common::config::SourceDetails; use vector_config::NamedComponent; use vector_core::config::LogNamespace; use vector_core::{ @@ -26,7 +27,7 @@ use vector_core::{ }, BufferType, WhenFull, }, - internal_event::EventsSent, + internal_event::{emit, EventsSent}, schema::Definition, EstimatedJsonEncodedSizeOf, }; @@ -136,16 +137,6 @@ pub struct Pieces { pub(crate) detach_triggers: HashMap<ComponentKey, Trigger>, } -// #[derive(Debug, Clone, Default, Eq, PartialEq)] -// pub struct PipelineDetails { -// pub sources: Vec<SourceDetails>, -// } - -#[derive(Debug, Clone, Eq, PartialEq)] -pub struct SourceDetails { - pub key: ComponentKey, -} - /// Builds only the new pieces, and doesn't check their topology. pub async fn build_pieces( config: &super::Config, @@ -404,7 +395,8 @@ pub async fn build_pieces( inputs.insert(key.clone(), (input_tx, node.inputs.clone())); - let (transform_task, transform_outputs) = build_transform(transform, node, input_rx); + let (transform_task, transform_outputs) = + build_transform(transform, node, input_rx, sources_details.clone()); outputs.extend(transform_outputs); tasks.insert(key.clone(), transform_task); @@ -640,17 +632,21 @@ fn build_transform( transform: Transform, node: TransformNode, input_rx: BufferReceiver<EventArray>, + sources_details: Vec<SourceDetails>, ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) { match transform { // TODO: avoid the double boxing for function transforms here - Transform::Function(t) => build_sync_transform(Box::new(t), node, input_rx), - Transform::Synchronous(t) => build_sync_transform(t, node, input_rx), + Transform::Function(t) => { + build_sync_transform(Box::new(t), node, input_rx, sources_details) + } + Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, sources_details), Transform::Task(t) => build_task_transform( t, input_rx, node.input_details.data_type(), node.typetag, &node.key, + sources_details, ), } } @@ -659,8 +655,9 @@ fn build_sync_transform( t: Box<dyn SyncTransform>, node: TransformNode, input_rx: BufferReceiver<EventArray>, + sources_details: Vec<SourceDetails>, ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) { - let (outputs, controls) = TransformOutputs::new(node.outputs); + let (outputs, controls) = TransformOutputs::new(node.outputs, sources_details); let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs); let transform = if node.enable_concurrency { @@ -840,6 +837,7 @@ fn build_task_transform( input_type: DataType, typetag: &str, key: &ComponentKey, + sources_details: Vec<SourceDetails>, ) -> (Task, HashMap<OutputId, fanout::ControlChannel>) { let (mut fanout, control) = Fanout::new(); @@ -855,13 +853,29 @@ fn build_task_transform( }); let stream = t .transform(Box::pin(filtered)) - .inspect(|events: &EventArray| { - emit!(EventsSent { - count: events.len(), - byte_size: events.estimated_json_encoded_size_of(), - output: None, - source: None, + .inspect(move |events: &EventArray| { + let mut sources: HashMap<Option<usize>, (usize, usize)> = HashMap::new(); + events.iter_events().for_each(|event| { + let size = event.estimated_json_encoded_size_of(); + + sources + .entry(event.metadata().source_id()) + .and_modify(|(count, byte_size)| { + *count += 1; + *byte_size += size; + }) + .or_insert((1, size)); }); + + for (source_id, (count, byte_size)) in sources { + emit(EventsSent { + count, + byte_size, + output: None, + source: source_id + .and_then(|id| sources_details.get(id).map(|details| details.key.id())), + }); + } }); let transform = async move { debug!("Task transform starting."); From 191b509bf5f8115e704ec0305fa097bad8432253 Mon Sep 17 00:00:00 2001 From: Jean Mertz <git@jeanmertz.com> Date: Tue, 20 Dec 2022 12:55:11 +0100 Subject: [PATCH 07/12] checkpoint Signed-off-by: Jean Mertz <git@jeanmertz.com> --- src/source_sender/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/source_sender/mod.rs b/src/source_sender/mod.rs index 3e76befec4709..831ae3e25adeb 100644 --- a/src/source_sender/mod.rs +++ b/src/source_sender/mod.rs @@ -330,7 +330,7 @@ impl Inner { count, byte_size, output: Some(self.output.as_ref()), - source: None, + source: Some(&self.source_id.1), }); return Err(error.into()); } @@ -341,7 +341,7 @@ impl Inner { count, byte_size, output: Some(self.output.as_ref()), - source: None, + source: Some(&self.source_id.1), }); Ok(()) From c31937ba7606742177513ce7ca902cf924f9ffe2 Mon Sep 17 00:00:00 2001 From: Jean Mertz <git@jeanmertz.com> Date: Tue, 20 Dec 2022 13:07:50 +0100 Subject: [PATCH 08/12] checkpoint Signed-off-by: Jean Mertz <git@jeanmertz.com> --- src/sinks/prometheus/exporter.rs | 40 +++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index 1a8c5706240c8..beb58d2f3e584 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -19,6 +19,7 @@ use serde_with::serde_as; use snafu::Snafu; use stream_cancel::{Trigger, Tripwire}; use tracing::{Instrument, Span}; +use vector_common::config::SourceDetails; use vector_config::configurable_component; use vector_core::{ internal_event::{ @@ -180,7 +181,7 @@ impl GenerateConfig for PrometheusExporterConfig { #[async_trait::async_trait] impl SinkConfig for PrometheusExporterConfig { - async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + async fn build(&self, ctx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { if self.flush_period_secs.as_secs() < MIN_FLUSH_PERIOD_SECS { return Err(Box::new(BuildError::FlushPeriodTooShort { min: MIN_FLUSH_PERIOD_SECS, @@ -189,7 +190,8 @@ impl SinkConfig for PrometheusExporterConfig { validate_quantiles(&self.quantiles)?; - let sink = PrometheusExporter::new(self.clone()); + let sources_details = ctx.sources_details; + let sink = PrometheusExporter::new(self.clone(), sources_details); let healthcheck = future::ok(()).boxed(); Ok((VectorSink::from_event_streamsink(sink), healthcheck)) @@ -212,6 +214,7 @@ struct PrometheusExporter { server_shutdown_trigger: Option<Trigger>, config: PrometheusExporterConfig, metrics: Arc<RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>>, + sources_details: Vec<SourceDetails>, } /// Expiration metadata for a metric. @@ -378,6 +381,7 @@ fn handle( quantiles: &[f64], metrics: &RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>, bytes_sent: &Registered<BytesSent>, + sources_details: &[SourceDetails], ) -> Response<Body> { let mut response = Response::new(Body::empty()); @@ -400,8 +404,19 @@ fn handle( .sum(); let mut collector = StringCollector::new(); + let mut sources: HashMap<Option<usize>, (usize, usize)> = HashMap::new(); for (_, (metric, _)) in metrics.iter() { + let size = event.estimated_json_encoded_size_of(); + + sources + .entry(event.metadata().source_id()) + .and_modify(|(count, byte_size)| { + *count += 1; + *byte_size += size; + }) + .or_insert((1, size)); + collector.encode_metric(default_namespace, buckets, quantiles, metric); } @@ -419,12 +434,15 @@ fn handle( bytes_sent.emit(ByteSize(body_size)); - emit!(EventsSent { - count, - byte_size, - output: None, - source: None, - }); + for (source_id, (count, byte_size)) in sources { + emit(EventsSent { + count, + byte_size, + output: None, + source: source_id + .and_then(|id| sources_details.get(id).map(|details| details.key.id())), + }); + } } (true, _, _) => { @@ -436,11 +454,12 @@ fn handle( } impl PrometheusExporter { - fn new(config: PrometheusExporterConfig) -> Self { + fn new(config: PrometheusExporterConfig, sources_details: Vec<SourceDetails>) -> Self { Self { server_shutdown_trigger: None, config, metrics: Arc::new(RwLock::new(IndexMap::new())), + sources_details, } } @@ -457,6 +476,7 @@ impl PrometheusExporter { let buckets = self.config.buckets.clone(); let quantiles = self.config.quantiles.clone(); let auth = self.config.auth.clone(); + let sources_details = self.sources_details.clone(); let new_service = make_service_fn(move |_| { let span = Span::current(); @@ -466,6 +486,7 @@ impl PrometheusExporter { let quantiles = quantiles.clone(); let bytes_sent = bytes_sent.clone(); let auth = auth.clone(); + let sources_details = sources_details.clone(); async move { Ok::<_, Infallible>(service_fn(move |req| { @@ -478,6 +499,7 @@ impl PrometheusExporter { &quantiles, &metrics, &bytes_sent, + &sources_details, ); emit!(PrometheusServerRequestComplete { From d6d2f6b4cafb9a061a1bb27b81a512214af8eb00 Mon Sep 17 00:00:00 2001 From: Jean Mertz <git@jeanmertz.com> Date: Tue, 20 Dec 2022 13:55:00 +0100 Subject: [PATCH 09/12] checkpoint Signed-off-by: Jean Mertz <git@jeanmertz.com> --- src/sinks/pulsar.rs | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/src/sinks/pulsar.rs b/src/sinks/pulsar.rs index 0ab042bfd5dfd..dd6032439037c 100644 --- a/src/sinks/pulsar.rs +++ b/src/sinks/pulsar.rs @@ -24,6 +24,7 @@ use snafu::{ResultExt, Snafu}; use tokio_util::codec::Encoder as _; use value::Value; use vector_common::{ + config::SourceDetails, internal_event::{ ByteSize, BytesSent, EventsSent, InternalEventHandle as _, Protocol, Registered, }, @@ -31,7 +32,7 @@ use vector_common::{ sensitive_string::SensitiveString, }; use vector_config::configurable_component; -use vector_core::config::log_schema; +use vector_core::{config::log_schema, event::EventMetadata}; #[derive(Debug, Snafu)] enum BuildError { @@ -121,6 +122,7 @@ enum PulsarSinkState { Result<SendFuture, PulsarError>, RequestMetadata, EventFinalizers, + Option<usize>, ), >, ), @@ -138,10 +140,12 @@ struct PulsarSink { Result<CommandSendReceipt, PulsarError>, RequestMetadata, EventFinalizers, + Option<String>, ), >, >, bytes_sent: Registered<BytesSent>, + sources_details: Vec<SourceDetails>, } impl GenerateConfig for PulsarSinkConfig { @@ -162,7 +166,7 @@ impl GenerateConfig for PulsarSinkConfig { impl SinkConfig for PulsarSinkConfig { async fn build( &self, - _cx: SinkContext, + ctx: SinkContext, ) -> crate::Result<(super::VectorSink, super::Healthcheck)> { let producer = self .create_pulsar_producer() @@ -172,12 +176,14 @@ impl SinkConfig for PulsarSinkConfig { let transformer = self.encoding.transformer(); let serializer = self.encoding.build()?; let encoder = Encoder::<()>::new(serializer); + let sources_details = ctx.sources_details.clone(); let sink = PulsarSink::new( producer, transformer, encoder, self.partition_key_field.clone(), + sources_details, )?; let producer = self @@ -257,6 +263,7 @@ impl PulsarSink { transformer: Transformer, encoder: Encoder<()>, partition_key_field: Option<String>, + sources_details: Vec<SourceDetails>, ) -> crate::Result<Self> { Ok(Self { transformer, @@ -265,12 +272,13 @@ impl PulsarSink { in_flight: FuturesUnordered::new(), bytes_sent: register!(BytesSent::from(Protocol::TCP)), partition_key_field, + sources_details, }) } fn poll_in_flight_prepare(&mut self, cx: &mut Context<'_>) -> Poll<()> { if let PulsarSinkState::Sending(fut) = &mut self.state { - let (producer, result, metadata, finalizers) = ready!(fut.as_mut().poll(cx)); + let (producer, result, metadata, finalizers, source_id) = ready!(fut.as_mut().poll(cx)); self.state = PulsarSinkState::Ready(producer); self.in_flight.push(Box::pin(async move { @@ -278,7 +286,14 @@ impl PulsarSink { Ok(fut) => fut.await, Err(error) => Err(error), }; - (result, metadata, finalizers) + + let source = source_id.and_then(|id| { + self.sources_details + .get(id) + .map(|details| details.key.id().to_owned()) + }); + + (result, metadata, finalizers, source) })); } @@ -318,6 +333,8 @@ impl Sink<Event> for PulsarSink { let metadata_builder = RequestMetadataBuilder::from_events(&event); self.transformer.transform(&mut event); + let source_id = event.metadata().source_id(); + let finalizers = event.take_finalizers(); let mut bytes = BytesMut::new(); self.encoder.encode(event, &mut bytes).map_err(|_| { @@ -346,7 +363,7 @@ impl Sink<Event> for PulsarSink { builder = builder.with_key(key); }; let result = builder.send().await; - (producer, result, metadata, finalizers) + (producer, result, metadata, finalizers, source_key) })), ); @@ -359,7 +376,7 @@ impl Sink<Event> for PulsarSink { let this = Pin::into_inner(self); while !this.in_flight.is_empty() { match ready!(Pin::new(&mut this.in_flight).poll_next(cx)) { - Some((Ok(result), metadata, finalizers)) => { + Some((Ok(result), metadata, finalizers, source)) => { trace!( message = "Pulsar sink produced message.", message_id = ?result.message_id, @@ -373,13 +390,13 @@ impl Sink<Event> for PulsarSink { count: metadata.event_count(), byte_size: metadata.events_estimated_json_encoded_byte_size(), output: None, - source: None, + source, }); this.bytes_sent .emit(ByteSize(metadata.request_encoded_size())); } - Some((Err(error), metadata, finalizers)) => { + Some((Err(error), metadata, finalizers, _)) => { finalizers.update_status(EventStatus::Errored); emit!(PulsarSendingError { error: Box::new(error), From 41f764cdca862199eeaa47f3cb5542d324d698ed Mon Sep 17 00:00:00 2001 From: Jean Mertz <git@jeanmertz.com> Date: Wed, 21 Dec 2022 13:35:55 +0100 Subject: [PATCH 10/12] checkpoint Signed-off-by: Jean Mertz <git@jeanmertz.com> --- src/sinks/blackhole/config.rs | 5 ++-- src/sinks/blackhole/sink.rs | 49 +++++++++++++++++++++----------- src/sinks/file/mod.rs | 2 +- src/sinks/nats.rs | 6 ++-- src/sinks/prometheus/exporter.rs | 13 +++------ src/sinks/pulsar.rs | 6 ++-- src/sinks/websocket/sink.rs | 2 +- 7 files changed, 48 insertions(+), 35 deletions(-) diff --git a/src/sinks/blackhole/config.rs b/src/sinks/blackhole/config.rs index 5777afab4d7a4..b10c88483bf8b 100644 --- a/src/sinks/blackhole/config.rs +++ b/src/sinks/blackhole/config.rs @@ -39,8 +39,9 @@ pub struct BlackholeConfig { #[async_trait::async_trait] impl SinkConfig for BlackholeConfig { - async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let sink = BlackholeSink::new(self.clone()); + async fn build(&self, ctx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let sources_details = ctx.sources_details.clone(); + let sink = BlackholeSink::new(self.clone(), sources_details); let healthcheck = future::ok(()).boxed(); Ok((VectorSink::Stream(Box::new(sink)), healthcheck)) diff --git a/src/sinks/blackhole/sink.rs b/src/sinks/blackhole/sink.rs index 12e5aa21b7984..85af4b678f4c0 100644 --- a/src/sinks/blackhole/sink.rs +++ b/src/sinks/blackhole/sink.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -13,6 +14,7 @@ use tokio::{ sync::watch, time::{interval, sleep_until}, }; +use vector_common::config::SourceDetails; use vector_core::{ internal_event::{BytesSent, EventsSent}, EstimatedJsonEncodedSizeOf, @@ -28,15 +30,17 @@ pub struct BlackholeSink { total_raw_bytes: Arc<AtomicUsize>, config: BlackholeConfig, last: Option<Instant>, + sources_details: Vec<SourceDetails>, } impl BlackholeSink { - pub fn new(config: BlackholeConfig) -> Self { + pub fn new(config: BlackholeConfig, sources_details: Vec<SourceDetails>) -> Self { BlackholeSink { config, total_events: Arc::new(AtomicUsize::new(0)), total_raw_bytes: Arc::new(AtomicUsize::new(0)), last: None, + sources_details, } } } @@ -85,24 +89,37 @@ impl StreamSink<EventArray> for BlackholeSink { self.last = Some(until); } - let message_len = events.estimated_json_encoded_size_of(); + let mut sources: HashMap<Option<usize>, (usize, usize)> = HashMap::new(); + events.iter_events().for_each(|event| { + let size = event.estimated_json_encoded_size_of(); - let _ = self.total_events.fetch_add(events.len(), Ordering::AcqRel); - let _ = self - .total_raw_bytes - .fetch_add(message_len, Ordering::AcqRel); - - emit!(EventsSent { - count: events.len(), - byte_size: message_len, - output: None, - source: None, + sources + .entry(event.metadata().source_id()) + .and_modify(|(count, byte_size)| { + *count += 1; + *byte_size += size; + }) + .or_insert((1, size)); }); - emit!(BytesSent { - byte_size: message_len, - protocol: "blackhole".to_string().into(), - }); + for (source_id, (count, byte_size)) in sources { + let _ = self.total_events.fetch_add(events.len(), Ordering::AcqRel); + let _ = self.total_raw_bytes.fetch_add(byte_size, Ordering::AcqRel); + + emit!(EventsSent { + count, + byte_size, + output: None, + source: source_id.and_then(|id| { + self.sources_details.get(id).map(|details| details.key.id()) + }), + }); + + emit!(BytesSent { + byte_size, + protocol: "blackhole".to_string().into(), + }); + } } // Notify the reporting task to shutdown. diff --git a/src/sinks/file/mod.rs b/src/sinks/file/mod.rs index 36fe4eb57e3be..4cd7a68cb305d 100644 --- a/src/sinks/file/mod.rs +++ b/src/sinks/file/mod.rs @@ -17,6 +17,7 @@ use tokio::{ io::AsyncWriteExt, }; use tokio_util::codec::Encoder as _; +use vector_common::config::SourceDetails; use vector_config::configurable_component; use vector_core::{internal_event::EventsSent, EstimatedJsonEncodedSizeOf}; @@ -28,7 +29,6 @@ use crate::{ internal_events::{FileBytesSent, FileIoError, FileOpen, TemplateRenderingError}, sinks::util::StreamSink, template::Template, - topology::builder::SourceDetails, }; mod bytes_path; use std::convert::TryFrom; diff --git a/src/sinks/nats.rs b/src/sinks/nats.rs index 8526f11ab1bad..d16ac9a89b33b 100644 --- a/src/sinks/nats.rs +++ b/src/sinks/nats.rs @@ -6,8 +6,9 @@ use codecs::JsonSerializerConfig; use futures::{stream::BoxStream, FutureExt, StreamExt, TryFutureExt}; use snafu::{ResultExt, Snafu}; use tokio_util::codec::Encoder as _; -use vector_common::internal_event::{ - ByteSize, BytesSent, EventsSent, InternalEventHandle, Protocol, +use vector_common::{ + config::SourceDetails, + internal_event::{ByteSize, BytesSent, EventsSent, InternalEventHandle, Protocol}, }; use vector_config::configurable_component; @@ -20,7 +21,6 @@ use crate::{ sinks::util::StreamSink, template::{Template, TemplateParseError}, tls::TlsEnableableConfig, - topology::builder::SourceDetails, }; #[derive(Debug, Snafu)] diff --git a/src/sinks/prometheus/exporter.rs b/src/sinks/prometheus/exporter.rs index beb58d2f3e584..63661854ef739 100644 --- a/src/sinks/prometheus/exporter.rs +++ b/src/sinks/prometheus/exporter.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, convert::Infallible, hash::Hash, mem::{discriminant, Discriminant}, @@ -23,7 +24,7 @@ use vector_common::config::SourceDetails; use vector_config::configurable_component; use vector_core::{ internal_event::{ - ByteSize, BytesSent, EventsSent, InternalEventHandle as _, Protocol, Registered, + emit, ByteSize, BytesSent, EventsSent, InternalEventHandle as _, Protocol, Registered, }, ByteSizeOf, EstimatedJsonEncodedSizeOf, }; @@ -397,20 +398,14 @@ fn handle( (true, &Method::GET, "/metrics") => { let metrics = metrics.read().expect(LOCK_FAILED); - let count = metrics.len(); - let byte_size = metrics - .iter() - .map(|(_, (metric, _))| metric.estimated_json_encoded_size_of()) - .sum(); - let mut collector = StringCollector::new(); let mut sources: HashMap<Option<usize>, (usize, usize)> = HashMap::new(); for (_, (metric, _)) in metrics.iter() { - let size = event.estimated_json_encoded_size_of(); + let size = metric.estimated_json_encoded_size_of(); sources - .entry(event.metadata().source_id()) + .entry(metric.metadata().source_id()) .and_modify(|(count, byte_size)| { *count += 1; *byte_size += size; diff --git a/src/sinks/pulsar.rs b/src/sinks/pulsar.rs index dd6032439037c..2460da9bdd42b 100644 --- a/src/sinks/pulsar.rs +++ b/src/sinks/pulsar.rs @@ -32,7 +32,7 @@ use vector_common::{ sensitive_string::SensitiveString, }; use vector_config::configurable_component; -use vector_core::{config::log_schema, event::EventMetadata}; +use vector_core::config::log_schema; #[derive(Debug, Snafu)] enum BuildError { @@ -363,7 +363,7 @@ impl Sink<Event> for PulsarSink { builder = builder.with_key(key); }; let result = builder.send().await; - (producer, result, metadata, finalizers, source_key) + (producer, result, metadata, finalizers, source_id) })), ); @@ -390,7 +390,7 @@ impl Sink<Event> for PulsarSink { count: metadata.event_count(), byte_size: metadata.events_estimated_json_encoded_byte_size(), output: None, - source, + source: source.as_deref(), }); this.bytes_sent diff --git a/src/sinks/websocket/sink.rs b/src/sinks/websocket/sink.rs index c85ff2aee11e2..dc2010b2aec6b 100644 --- a/src/sinks/websocket/sink.rs +++ b/src/sinks/websocket/sink.rs @@ -23,6 +23,7 @@ use tokio_tungstenite::{ WebSocketStream as WsStream, }; use tokio_util::codec::Encoder as _; +use vector_common::config::SourceDetails; use vector_core::{ internal_event::{ByteSize, BytesSent, EventsSent, InternalEventHandle as _, Protocol}, EstimatedJsonEncodedSizeOf, @@ -41,7 +42,6 @@ use crate::{ sinks::util::{retries::ExponentialBackoff, StreamSink}, sinks::websocket::config::WebSocketSinkConfig, tls::{MaybeTlsSettings, MaybeTlsStream, TlsError}, - topology::builder::SourceDetails, }; #[derive(Debug, Snafu)] From f384eb15014f9021f889e1dfe1204fd1ae7f569b Mon Sep 17 00:00:00 2001 From: Jean Mertz <git@jeanmertz.com> Date: Wed, 21 Dec 2022 13:53:52 +0100 Subject: [PATCH 11/12] checkpoint Signed-off-by: Jean Mertz <git@jeanmertz.com> --- src/sinks/pulsar.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/sinks/pulsar.rs b/src/sinks/pulsar.rs index 2460da9bdd42b..c9d0858bb14b7 100644 --- a/src/sinks/pulsar.rs +++ b/src/sinks/pulsar.rs @@ -280,6 +280,12 @@ impl PulsarSink { if let PulsarSinkState::Sending(fut) = &mut self.state { let (producer, result, metadata, finalizers, source_id) = ready!(fut.as_mut().poll(cx)); + let source = source_id.and_then(|id| { + self.sources_details + .get(id) + .map(|details| details.key.id().to_owned()) + }); + self.state = PulsarSinkState::Ready(producer); self.in_flight.push(Box::pin(async move { let result = match result { @@ -287,12 +293,6 @@ impl PulsarSink { Err(error) => Err(error), }; - let source = source_id.and_then(|id| { - self.sources_details - .get(id) - .map(|details| details.key.id().to_owned()) - }); - (result, metadata, finalizers, source) })); } From 9ee3c6c2c48261645c44a393c08a5e0b54d8327b Mon Sep 17 00:00:00 2001 From: Jean Mertz <git@jeanmertz.com> Date: Wed, 21 Dec 2022 14:14:12 +0100 Subject: [PATCH 12/12] checkpoint Signed-off-by: Jean Mertz <git@jeanmertz.com> --- src/sinks/console/config.rs | 5 ++++- src/sinks/console/sink.rs | 9 ++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/sinks/console/config.rs b/src/sinks/console/config.rs index 7e4ca5e44fb59..bb81659b5ef62 100644 --- a/src/sinks/console/config.rs +++ b/src/sinks/console/config.rs @@ -60,7 +60,8 @@ impl GenerateConfig for ConsoleSinkConfig { #[async_trait::async_trait] impl SinkConfig for ConsoleSinkConfig { - async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + async fn build(&self, ctx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let sources_details = ctx.sources_details.clone(); let transformer = self.encoding.transformer(); let (framer, serializer) = self.encoding.build(SinkType::StreamBased)?; let encoder = Encoder::<Framer>::new(framer, serializer); @@ -70,11 +71,13 @@ impl SinkConfig for ConsoleSinkConfig { output: io::stdout(), transformer, encoder, + sources_details, }), Target::Stderr => VectorSink::from_event_streamsink(WriterSink { output: io::stderr(), transformer, encoder, + sources_details, }), }; diff --git a/src/sinks/console/sink.rs b/src/sinks/console/sink.rs index 38214c4c11bde..5c64cbeb01f0c 100644 --- a/src/sinks/console/sink.rs +++ b/src/sinks/console/sink.rs @@ -4,6 +4,7 @@ use codecs::encoding::Framer; use futures::{stream::BoxStream, StreamExt}; use tokio::{io, io::AsyncWriteExt}; use tokio_util::codec::Encoder as _; +use vector_common::config::SourceDetails; use vector_core::{ internal_event::{ByteSize, BytesSent, EventsSent, InternalEventHandle as _, Protocol}, EstimatedJsonEncodedSizeOf, @@ -19,6 +20,7 @@ pub struct WriterSink<T> { pub output: T, pub transformer: Transformer, pub encoder: Encoder<Framer>, + pub sources_details: Vec<SourceDetails>, } #[async_trait] @@ -30,6 +32,11 @@ where let bytes_sent = register!(BytesSent::from(Protocol("console".into(),))); while let Some(mut event) = input.next().await { let event_byte_size = event.estimated_json_encoded_size_of(); + let source = event + .metadata() + .source_id() + .and_then(|id| self.sources_details.get(id).map(|details| details.key.id())); + self.transformer.transform(&mut event); let finalizers = event.take_finalizers(); @@ -54,7 +61,7 @@ where byte_size: event_byte_size, count: 1, output: None, - source: None, + source, }); bytes_sent.emit(ByteSize(bytes.len())); }