From 09c562a1a885c123bf6eb17095f88b679ac64c41 Mon Sep 17 00:00:00 2001 From: neuronull Date: Tue, 11 Jul 2023 10:59:14 -0600 Subject: [PATCH 01/11] add fix and small refactor --- src/components/validation/resources/event.rs | 57 ++- src/components/validation/resources/http.rs | 4 +- src/components/validation/resources/mod.rs | 2 +- src/components/validation/runner/io.rs | 6 +- src/components/validation/runner/mod.rs | 2 +- src/components/validation/runner/telemetry.rs | 44 +- .../validators/component_spec/mod.rs | 14 +- .../validators/component_spec/sources.rs | 375 ++++++------------ 8 files changed, 222 insertions(+), 282 deletions(-) diff --git a/src/components/validation/resources/event.rs b/src/components/validation/resources/event.rs index 21c38e58bfad5..fe61f782bafbd 100644 --- a/src/components/validation/resources/event.rs +++ b/src/components/validation/resources/event.rs @@ -1,10 +1,12 @@ use serde::Deserialize; +use snafu::Snafu; use vector_core::event::{Event, LogEvent}; -/// An event used in a test case. +/// An raw test case event for deserialization from yaml file. +/// This is an intermediary step to TestEvent. #[derive(Clone, Debug, Deserialize)] #[serde(untagged)] -pub enum TestEvent { +pub enum RawTestEvent { /// The event is used, as-is, without modification. Passthrough(EventData), @@ -20,12 +22,51 @@ pub enum TestEvent { Modified { modified: bool, event: EventData }, } -impl TestEvent { - pub fn into_event(self) -> Event { - match self { - Self::Passthrough(event) => event.into_event(), - Self::Modified { event, .. } => event.into_event(), - } +/// An event used in a test case. +#[derive(Clone, Debug, Deserialize)] +#[serde(try_from = "RawTestEvent")] +#[serde(untagged)] +pub enum TestEvent { + /// The event is used, as-is, without modification. + Passthrough(Event), + + /// The event is potentially modified by the external resource. + /// + /// The modification made is dependent on the external resource, but this mode is made available + /// for when a test case wants to exercise the failure path, but cannot cause a failure simply + /// by constructing the event in a certain way i.e. adding an invalid field, or removing a + /// required field, or using an invalid field value, and so on. + /// + /// For transforms and sinks, generally, the only way to cause an error is if the event itself + /// is malformed in some way, which can be achieved without this test event variant. + Modified { modified: bool, event: Event }, +} + +// impl TestEvent { +// pub fn into_event(self) -> Event { +// match self { +// Self::Passthrough(event) => event.into_event(), +// Self::Modified { event, .. } => event.into_event(), +// } +// } +// } + +#[derive(Clone, Debug, Eq, PartialEq, Snafu)] +pub enum RawTestEventParseError {} + +impl TryFrom for TestEvent { + type Error = RawTestEventParseError; + + fn try_from(other: RawTestEvent) -> Result { + Ok(match other { + RawTestEvent::Passthrough(event_data) => { + TestEvent::Passthrough(event_data.into_event()) + } + RawTestEvent::Modified { modified, event } => TestEvent::Modified { + modified, + event: event.into_event(), + }, + }) } } diff --git a/src/components/validation/resources/http.rs b/src/components/validation/resources/http.rs index 8fa066b6f3f4a..64ca6e04fd4bd 100644 --- a/src/components/validation/resources/http.rs +++ b/src/components/validation/resources/http.rs @@ -410,7 +410,7 @@ pub fn encode_test_event( TestEvent::Passthrough(event) => { // Encode the event normally. encoder - .encode(event.into_event(), buf) + .encode(event, buf) .expect("should not fail to encode input event"); } TestEvent::Modified { event, .. } => { @@ -431,7 +431,7 @@ pub fn encode_test_event( }; alt_encoder - .encode(event.into_event(), buf) + .encode(event, buf) .expect("should not fail to encode input event"); } } diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 2b9fc3c542ccb..f382e3b2c6277 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -13,7 +13,7 @@ use vector_core::{config::DataType, event::Event}; use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming}; -pub use self::event::{EventData, TestEvent}; +pub use self::event::{RawTestEvent, TestEvent}; pub use self::http::{encode_test_event, HttpResourceConfig}; use super::sync::{Configuring, TaskCoordinator}; diff --git a/src/components/validation/runner/io.rs b/src/components/validation/runner/io.rs index c454ba433c847..d2df83e754f6e 100644 --- a/src/components/validation/runner/io.rs +++ b/src/components/validation/runner/io.rs @@ -99,8 +99,12 @@ impl InputEdge { started.mark_as_done(); while let Some(test_event) = rx.recv().await { + let event = match test_event { + TestEvent::Passthrough(e) => e, + TestEvent::Modified { modified: _, event } => event, + }; let request = PushEventsRequest { - events: vec![test_event.into_event().into()], + events: vec![event.into()], }; if let Err(e) = client.push_events(request).await { diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index 2c2a066b17806..d2d483e48bd0a 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -271,7 +271,7 @@ impl Runner { // like the aforementioned unit tests, switch to any improved mechanism we come up with // in the future to make these tests more deterministic and waste less time waiting // around if we can avoid it. - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(2)).await; let input_events = test_case.events.clone(); let input_driver = tokio::spawn(async move { diff --git a/src/components/validation/runner/telemetry.rs b/src/components/validation/runner/telemetry.rs index 9ef813838f1dc..6b5bb784291ff 100644 --- a/src/components/validation/runner/telemetry.rs +++ b/src/components/validation/runner/telemetry.rs @@ -23,6 +23,8 @@ const INTERNAL_LOGS_KEY: &str = "_telemetry_logs"; const INTERNAL_METRICS_KEY: &str = "_telemetry_metrics"; const VECTOR_SINK_KEY: &str = "_telemetry_out"; +const SHUTDOWN_TICKS: u8 = 2; + // The metrics event to monitor for before shutting down a telemetry collector. const INTERNAL_METRICS_SHUTDOWN_EVENT: &str = "component_received_events_total"; @@ -113,28 +115,40 @@ impl Telemetry { let mut events_seen = 0; let current_time = chrono::Utc::now(); + let timeout = tokio::time::sleep(Duration::from_secs(10)); + tokio::pin!(timeout); + loop { - match &rx.recv().await { - None => break 'outer, - Some(telemetry_event) => { - telemetry_events.push(telemetry_event.clone()); - if let Event::Metric(metric) = telemetry_event { - if let Some(tags) = metric.tags() { - if metric.name() == INTERNAL_METRICS_SHUTDOWN_EVENT && - tags.get("component_name") == Some(INTERNAL_LOGS_KEY) && - metric.data().timestamp().unwrap() > ¤t_time { - debug!("Telemetry: processed one component_received_events_total event."); - - events_seen += 1; - if events_seen == 2 { - break 'outer; + select! { + d = rx.recv() => { + match d { + None => break, + Some(telemetry_event) => { + telemetry_events.push(telemetry_event.clone()); + if let Event::Metric(metric) = telemetry_event { + if let Some(tags) = metric.tags() { + if metric.name() == INTERNAL_METRICS_SHUTDOWN_EVENT && + tags.get("component_name") == Some(INTERNAL_LOGS_KEY) && + metric.data().timestamp().unwrap() > ¤t_time { + debug!("Telemetry: processed one component_received_events_total event."); + + events_seen += 1; + if events_seen == SHUTDOWN_TICKS { + break; + } + } } } } } - } + }, + _ = &mut timeout => break, } } + if events_seen != SHUTDOWN_TICKS { + panic!("did not receive {SHUTDOWN_TICKS} events while waiting for shutdown! found {events_seen}"); + } + break 'outer; }, maybe_telemetry_event = rx.recv() => match maybe_telemetry_event { None => break, diff --git a/src/components/validation/validators/component_spec/mod.rs b/src/components/validation/validators/component_spec/mod.rs index e566403d95bf9..fe080f684d96c 100644 --- a/src/components/validation/validators/component_spec/mod.rs +++ b/src/components/validation/validators/component_spec/mod.rs @@ -8,7 +8,7 @@ use crate::components::validation::{ use super::Validator; -use self::sources::{validate_sources, SourceMetrics}; +use self::sources::{validate_sources, SourceMetricType}; /// Validates that the component meets the requirements of the [Component Specification][component_spec]. /// @@ -128,7 +128,7 @@ fn validate_telemetry( fn filter_events_by_metric_and_component<'a>( telemetry_events: &'a [Event], - metric: SourceMetrics, + metric_type: &SourceMetricType, component_name: &'a str, ) -> Result, Vec> { let metrics: Vec<&Metric> = telemetry_events @@ -141,7 +141,7 @@ fn filter_events_by_metric_and_component<'a>( } }) .filter(|&m| { - if m.name() == metric.to_string() { + if m.name() == metric_type.to_string() { if let Some(tags) = m.tags() { if tags.get("component_name").unwrap_or("") == component_name { return true; @@ -153,10 +153,14 @@ fn filter_events_by_metric_and_component<'a>( }) .collect(); - debug!("{}: {} metrics found.", metric.to_string(), metrics.len(),); + debug!( + "{}: {} metrics found.", + metric_type.to_string(), + metrics.len(), + ); if metrics.is_empty() { - return Err(vec![format!("{}: no metrics were emitted.", metric)]); + return Err(vec![format!("{}: no metrics were emitted.", metric_type)]); } Ok(metrics) diff --git a/src/components/validation/validators/component_spec/sources.rs b/src/components/validation/validators/component_spec/sources.rs index c25b217a399e4..d4dbc5283ccb6 100644 --- a/src/components/validation/validators/component_spec/sources.rs +++ b/src/components/validation/validators/component_spec/sources.rs @@ -1,7 +1,6 @@ use std::fmt::{Display, Formatter}; use bytes::BytesMut; -use vector_common::json_size::JsonSize; use vector_core::event::{Event, MetricKind}; use vector_core::EstimatedJsonEncodedSizeOf; @@ -11,7 +10,7 @@ use super::filter_events_by_metric_and_component; const TEST_SOURCE_NAME: &str = "test_source"; -pub enum SourceMetrics { +pub enum SourceMetricType { EventsReceived, EventsReceivedBytes, ReceivedBytesTotal, @@ -19,18 +18,24 @@ pub enum SourceMetrics { SentEventBytesTotal, } -impl SourceMetrics { +impl SourceMetricType { const fn name(&self) -> &'static str { match self { - SourceMetrics::EventsReceived => "component_received_events_total", - SourceMetrics::EventsReceivedBytes => "component_received_event_bytes_total", - SourceMetrics::ReceivedBytesTotal => "component_received_bytes_total", - SourceMetrics::SentEventsTotal => "component_sent_events_total", - SourceMetrics::SentEventBytesTotal => "component_sent_event_bytes_total", + SourceMetricType::EventsReceived => "component_received_events_total", + SourceMetricType::EventsReceivedBytes => "component_received_event_bytes_total", + SourceMetricType::ReceivedBytesTotal => "component_received_bytes_total", + SourceMetricType::SentEventsTotal => "component_sent_events_total", + SourceMetricType::SentEventBytesTotal => "component_sent_event_bytes_total", } } } +impl Display for SourceMetricType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } +} + pub fn validate_sources( configuration: &ValidationConfiguration, inputs: &[TestEvent], @@ -62,63 +67,68 @@ pub fn validate_sources( } } -impl Display for SourceMetrics { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.name()) - } -} +fn sum_counters( + metric_name: &SourceMetricType, + metrics: &[&vector_core::event::Metric], +) -> Result> { + let mut sum: f64 = 0.0; + let mut errs = Vec::new(); -fn validate_component_received_events_total( - _configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], - telemetry_events: &[Event], -) -> Result, Vec> { - let mut errs: Vec = Vec::new(); - - let metrics = filter_events_by_metric_and_component( - telemetry_events, - SourceMetrics::EventsReceived, - TEST_SOURCE_NAME, - )?; - - let mut events = 0; for m in metrics { match m.value() { vector_core::event::MetricValue::Counter { value } => { if let MetricKind::Absolute = m.data().kind { - events = *value as i32 + sum = *value; } else { - events += *value as i32 + sum += *value; } } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::EventsReceived, - )), + _ => errs.push(format!("{}: metric value is not a counter", metric_name,)), } } + if errs.is_empty() { + Ok(sum) + } else { + Err(errs) + } +} + +fn validate_events_total( + inputs: &[TestEvent], + telemetry_events: &[Event], + metric_type: &SourceMetricType, + passthrough: bool, +) -> Result, Vec> { + let mut errs: Vec = Vec::new(); + + let metrics = + filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); + + let events: i32 = sum_counters(metric_type, &metrics)? as i32; + let expected_events = inputs.iter().fold(0, |acc, i| { - if let TestEvent::Passthrough(_) = i { - return acc + 1; + if passthrough { + if let TestEvent::Passthrough(_) = i { + return acc + 1; + } + } else { + if let TestEvent::Modified { .. } = i { + return acc + 1; + } } acc }); debug!( "{}: {} events, {} expected events.", - SourceMetrics::EventsReceived, - events, - expected_events, + metric_type, events, expected_events, ); if events != expected_events { errs.push(format!( "{}: expected {} events, but received {}", - SourceMetrics::EventsReceived, - expected_events, - events + metric_type, expected_events, events )); } @@ -126,66 +136,30 @@ fn validate_component_received_events_total( return Err(errs); } - Ok(vec![format!( - "{}: {}", - SourceMetrics::EventsReceived, - events, - )]) + Ok(vec![format!("{}: {}", metric_type, events,)]) } -fn validate_component_received_event_bytes_total( - _configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], +fn validate_bytes_total( telemetry_events: &[Event], + metric_type: &SourceMetricType, + expected_bytes: usize, ) -> Result, Vec> { let mut errs: Vec = Vec::new(); - let metrics = filter_events_by_metric_and_component( - telemetry_events, - SourceMetrics::EventsReceivedBytes, - TEST_SOURCE_NAME, - )?; - - let mut metric_bytes: f64 = 0.0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - metric_bytes = *value - } else { - metric_bytes += value - } - } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::EventsReceivedBytes, - )), - } - } - - let expected_bytes = inputs.iter().fold(JsonSize::new(0), |acc, i| { - if let TestEvent::Passthrough(_) = i { - let size = vec![i.clone().into_event()].estimated_json_encoded_size_of(); - return acc + size; - } + let metrics = + filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME)?; - acc - }); + let metric_bytes = sum_counters(metric_type, &metrics)?; debug!( "{}: {} bytes, {} expected bytes.", - SourceMetrics::EventsReceivedBytes, - metric_bytes, - expected_bytes, + metric_type, metric_bytes, expected_bytes, ); - if JsonSize::new(metric_bytes as usize) != expected_bytes { + if metric_bytes != expected_bytes as f64 { errs.push(format!( "{}: expected {} bytes, but received {}", - SourceMetrics::EventsReceivedBytes, - expected_bytes, - metric_bytes + metric_type, expected_bytes, metric_bytes )); } @@ -193,83 +167,82 @@ fn validate_component_received_event_bytes_total( return Err(errs); } - Ok(vec![format!( - "{}: {}", - SourceMetrics::EventsReceivedBytes, - metric_bytes, - )]) + Ok(vec![format!("{}: {}", metric_type, metric_bytes,)]) } -fn validate_component_received_bytes_total( - configuration: &ValidationConfiguration, +fn validate_component_received_events_total( + _configuration: &ValidationConfiguration, inputs: &[TestEvent], _outputs: &[Event], telemetry_events: &[Event], ) -> Result, Vec> { - let mut errs: Vec = Vec::new(); - - let metrics = filter_events_by_metric_and_component( + validate_events_total( + inputs, telemetry_events, - SourceMetrics::ReceivedBytesTotal, - TEST_SOURCE_NAME, - )?; + &SourceMetricType::EventsReceived, + true, + ) +} - let mut metric_bytes: f64 = 0.0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - metric_bytes = *value - } else { - metric_bytes += value - } +fn validate_component_received_event_bytes_total( + _configuration: &ValidationConfiguration, + inputs: &[TestEvent], + _outputs: &[Event], + telemetry_events: &[Event], +) -> Result, Vec> { + let expected_bytes = inputs.iter().fold(0, |acc, i| { + if let TestEvent::Passthrough(e) = i { + match e { + Event::Log(log_event) => info!("event bytes total. test event: {:?}", log_event), + Event::Metric(_) => todo!(), + Event::Trace(_) => todo!(), } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::ReceivedBytesTotal, - )), + let size = vec![e.clone()].estimated_json_encoded_size_of(); + return acc + size; } - } + acc + }); + + validate_bytes_total( + telemetry_events, + &SourceMetricType::EventsReceivedBytes, + expected_bytes, + ) +} + +fn validate_component_received_bytes_total( + configuration: &ValidationConfiguration, + inputs: &[TestEvent], + _outputs: &[Event], + telemetry_events: &[Event], +) -> Result, Vec> { let mut expected_bytes = 0; if let Some(c) = &configuration.external_resource { let mut encoder = c.codec.into_encoder(); for i in inputs { + let event = match i { + TestEvent::Passthrough(e) => e, + TestEvent::Modified { modified: _, event } => event, + }; + match event { + Event::Log(log_event) => { + info!(" received bytes total. test event: {:?}", log_event) + } + Event::Metric(_) => todo!(), + Event::Trace(_) => todo!(), + } let mut buffer = BytesMut::new(); encode_test_event(&mut encoder, &mut buffer, i.clone()); expected_bytes += buffer.len() } } - debug!( - "{}: {} bytes, expected at least {} bytes.", - SourceMetrics::ReceivedBytesTotal, - metric_bytes, + validate_bytes_total( + telemetry_events, + &SourceMetricType::ReceivedBytesTotal, expected_bytes, - ); - - // We'll just establish a lower bound because we can't guarantee that the - // source will receive an exact number of bytes, since we can't synchronize - // with its internal logic. For example, some sources push or pull metrics - // on a schedule (http_client). - if metric_bytes < expected_bytes as f64 { - errs.push(format!( - "{}: expected at least {} bytes, but received {}", - SourceMetrics::ReceivedBytesTotal, - expected_bytes, - metric_bytes - )); - } - - if !errs.is_empty() { - return Err(errs); - } - - Ok(vec![format!( - "{}: {}", - SourceMetrics::ReceivedBytesTotal, - metric_bytes, - )]) + ) } fn validate_component_sent_events_total( @@ -278,63 +251,12 @@ fn validate_component_sent_events_total( _outputs: &[Event], telemetry_events: &[Event], ) -> Result, Vec> { - let mut errs: Vec = Vec::new(); - - let metrics = filter_events_by_metric_and_component( + validate_events_total( + inputs, telemetry_events, - SourceMetrics::SentEventsTotal, - TEST_SOURCE_NAME, - )?; - - let mut events = 0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - events = *value as i32 - } else { - events += *value as i32 - } - } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::SentEventsTotal, - )), - } - } - - let expected_events = inputs.iter().fold(0, |acc, i| { - if let TestEvent::Passthrough(_) = i { - return acc + 1; - } - acc - }); - - debug!( - "{}: {} events, {} expected events.", - SourceMetrics::SentEventsTotal, - events, - expected_events, - ); - - if events != expected_events { - errs.push(format!( - "{}: expected {} events, but received {}", - SourceMetrics::SentEventsTotal, - inputs.len(), - events - )); - } - - if !errs.is_empty() { - return Err(errs); - } - - Ok(vec![format!( - "{}: {}", - SourceMetrics::SentEventsTotal, - events, - )]) + &SourceMetricType::SentEventsTotal, + true, + ) } fn validate_component_sent_event_bytes_total( @@ -343,59 +265,14 @@ fn validate_component_sent_event_bytes_total( outputs: &[Event], telemetry_events: &[Event], ) -> Result, Vec> { - let mut errs: Vec = Vec::new(); - - let metrics = filter_events_by_metric_and_component( - telemetry_events, - SourceMetrics::SentEventBytesTotal, - TEST_SOURCE_NAME, - )?; - - let mut metric_bytes: f64 = 0.0; - for m in metrics { - match m.value() { - vector_core::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - metric_bytes = *value - } else { - metric_bytes += value - } - } - _ => errs.push(format!( - "{}: metric value is not a counter", - SourceMetrics::SentEventBytesTotal, - )), - } - } - - let mut expected_bytes = JsonSize::zero(); + let mut expected_bytes = 0; for e in outputs { expected_bytes += vec![e].estimated_json_encoded_size_of(); } - debug!( - "{}: {} bytes, {} expected bytes.", - SourceMetrics::SentEventBytesTotal, - metric_bytes, + validate_bytes_total( + telemetry_events, + &SourceMetricType::SentEventBytesTotal, expected_bytes, - ); - - if JsonSize::new(metric_bytes as usize) != expected_bytes { - errs.push(format!( - "{}: expected {} bytes, but received {}.", - SourceMetrics::SentEventBytesTotal, - expected_bytes, - metric_bytes - )); - } - - if !errs.is_empty() { - return Err(errs); - } - - Ok(vec![format!( - "{}: {}", - SourceMetrics::SentEventBytesTotal, - metric_bytes, - )]) + ) } From 1e0e6e7bddb5bad5c22f2792e217056b54c67713 Mon Sep 17 00:00:00 2001 From: neuronull Date: Tue, 11 Jul 2023 11:13:06 -0600 Subject: [PATCH 02/11] fix compilation errors --- .../validators/component_spec/mod.rs | 18 +++++------------- .../validators/component_spec/sources.rs | 6 +++--- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/components/validation/validators/component_spec/mod.rs b/src/components/validation/validators/component_spec/mod.rs index fe080f684d96c..88122822f2c0f 100644 --- a/src/components/validation/validators/component_spec/mod.rs +++ b/src/components/validation/validators/component_spec/mod.rs @@ -128,9 +128,9 @@ fn validate_telemetry( fn filter_events_by_metric_and_component<'a>( telemetry_events: &'a [Event], - metric_type: &SourceMetricType, + metric: &SourceMetricType, component_name: &'a str, -) -> Result, Vec> { +) -> Vec<&'a Metric> { let metrics: Vec<&Metric> = telemetry_events .iter() .flat_map(|e| { @@ -141,7 +141,7 @@ fn filter_events_by_metric_and_component<'a>( } }) .filter(|&m| { - if m.name() == metric_type.to_string() { + if m.name() == metric.to_string() { if let Some(tags) = m.tags() { if tags.get("component_name").unwrap_or("") == component_name { return true; @@ -153,15 +153,7 @@ fn filter_events_by_metric_and_component<'a>( }) .collect(); - debug!( - "{}: {} metrics found.", - metric_type.to_string(), - metrics.len(), - ); + debug!("{}: {} metrics found.", metric.to_string(), metrics.len(),); - if metrics.is_empty() { - return Err(vec![format!("{}: no metrics were emitted.", metric_type)]); - } - - Ok(metrics) + metrics } diff --git a/src/components/validation/validators/component_spec/sources.rs b/src/components/validation/validators/component_spec/sources.rs index d4dbc5283ccb6..c6bea95fe9205 100644 --- a/src/components/validation/validators/component_spec/sources.rs +++ b/src/components/validation/validators/component_spec/sources.rs @@ -147,7 +147,7 @@ fn validate_bytes_total( let mut errs: Vec = Vec::new(); let metrics = - filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME)?; + filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); let metric_bytes = sum_counters(metric_type, &metrics)?; @@ -197,7 +197,7 @@ fn validate_component_received_event_bytes_total( Event::Metric(_) => todo!(), Event::Trace(_) => todo!(), } - let size = vec![e.clone()].estimated_json_encoded_size_of(); + let size = vec![e.clone()].estimated_json_encoded_size_of().get(); return acc + size; } @@ -267,7 +267,7 @@ fn validate_component_sent_event_bytes_total( ) -> Result, Vec> { let mut expected_bytes = 0; for e in outputs { - expected_bytes += vec![e].estimated_json_encoded_size_of(); + expected_bytes += vec![e].estimated_json_encoded_size_of().get(); } validate_bytes_total( From 63a9581205218677067a2bb78dbd546f444605dc Mon Sep 17 00:00:00 2001 From: neuronull Date: Tue, 11 Jul 2023 12:51:49 -0600 Subject: [PATCH 03/11] 3 ticks --- src/components/validation/runner/telemetry.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/components/validation/runner/telemetry.rs b/src/components/validation/runner/telemetry.rs index 6b5bb784291ff..3df924af3ea4f 100644 --- a/src/components/validation/runner/telemetry.rs +++ b/src/components/validation/runner/telemetry.rs @@ -23,7 +23,7 @@ const INTERNAL_LOGS_KEY: &str = "_telemetry_logs"; const INTERNAL_METRICS_KEY: &str = "_telemetry_metrics"; const VECTOR_SINK_KEY: &str = "_telemetry_out"; -const SHUTDOWN_TICKS: u8 = 2; +const SHUTDOWN_TICKS: u8 = 3; // The metrics event to monitor for before shutting down a telemetry collector. const INTERNAL_METRICS_SHUTDOWN_EVENT: &str = "component_received_events_total"; @@ -115,7 +115,7 @@ impl Telemetry { let mut events_seen = 0; let current_time = chrono::Utc::now(); - let timeout = tokio::time::sleep(Duration::from_secs(10)); + let timeout = tokio::time::sleep(Duration::from_secs(5)); tokio::pin!(timeout); loop { From c6af43eb13ab1596132aedee4095aa65f9a9bbc2 Mon Sep 17 00:00:00 2001 From: neuronull Date: Wed, 12 Jul 2023 11:15:08 -0600 Subject: [PATCH 04/11] dont compute expected metrics in validator --- src/components/validation/mod.rs | 13 ++ src/components/validation/resources/event.rs | 77 ++++++--- src/components/validation/resources/http.rs | 53 +----- src/components/validation/resources/mod.rs | 4 +- src/components/validation/runner/mod.rs | 159 +++++++++++++++--- .../validators/component_spec/mod.rs | 20 +-- .../validators/component_spec/sources.rs | 128 +++++--------- src/components/validation/validators/mod.rs | 5 +- 8 files changed, 255 insertions(+), 204 deletions(-) diff --git a/src/components/validation/mod.rs b/src/components/validation/mod.rs index 7457cd6548ad9..dd335140b97f5 100644 --- a/src/components/validation/mod.rs +++ b/src/components/validation/mod.rs @@ -170,6 +170,19 @@ macro_rules! register_validatable_component { }; } +/// Input and Output runners populate this structure as they send and receive events. +/// The structure is passed into the validator to use as the expected values for the +/// metrics that the components under test actually output. +#[derive(Default)] +pub struct RunnerMetrics { + pub received_events_total: u64, + pub received_event_bytes_total: u64, + pub received_bytes_total: u64, + pub sent_bytes_total: u64, // a reciprocal for received_bytes_total + pub sent_event_bytes_total: u64, + pub sent_event_total: u64, +} + #[cfg(all(test, feature = "component-validation-tests"))] mod tests { use std::{ diff --git a/src/components/validation/resources/event.rs b/src/components/validation/resources/event.rs index fe61f782bafbd..49bb8420b4944 100644 --- a/src/components/validation/resources/event.rs +++ b/src/components/validation/resources/event.rs @@ -1,8 +1,16 @@ +use bytes::BytesMut; use serde::Deserialize; use snafu::Snafu; +use tokio_util::codec::Encoder as _; + +use crate::codecs::Encoder; +use codecs::{ + encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues, + NewlineDelimitedEncoder, +}; use vector_core::event::{Event, LogEvent}; -/// An raw test case event for deserialization from yaml file. +/// An test case event for deserialization from yaml file. /// This is an intermediary step to TestEvent. #[derive(Clone, Debug, Deserialize)] #[serde(untagged)] @@ -22,6 +30,22 @@ pub enum RawTestEvent { Modified { modified: bool, event: EventData }, } +#[derive(Clone, Debug, Deserialize)] +#[serde(untagged)] +pub enum EventData { + /// A log event. + Log(String), +} + +impl EventData { + /// Converts this event data into an `Event`. + pub fn into_event(self) -> Event { + match self { + Self::Log(message) => Event::Log(LogEvent::from_bytes_legacy(&message.into())), + } + } +} + /// An event used in a test case. #[derive(Clone, Debug, Deserialize)] #[serde(try_from = "RawTestEvent")] @@ -42,15 +66,6 @@ pub enum TestEvent { Modified { modified: bool, event: Event }, } -// impl TestEvent { -// pub fn into_event(self) -> Event { -// match self { -// Self::Passthrough(event) => event.into_event(), -// Self::Modified { event, .. } => event.into_event(), -// } -// } -// } - #[derive(Clone, Debug, Eq, PartialEq, Snafu)] pub enum RawTestEventParseError {} @@ -70,18 +85,38 @@ impl TryFrom for TestEvent { } } -#[derive(Clone, Debug, Deserialize)] -#[serde(untagged)] -pub enum EventData { - /// A log event. - Log(String), -} +pub fn encode_test_event( + encoder: &mut Encoder, + buf: &mut BytesMut, + event: TestEvent, +) { + match event { + TestEvent::Passthrough(event) => { + // Encode the event normally. + encoder + .encode(event, buf) + .expect("should not fail to encode input event"); + } + TestEvent::Modified { event, .. } => { + // This is a little fragile, but we check what serializer this encoder uses, and based + // on `Serializer::supports_json`, we choose an opposing codec. For example, if the + // encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise + // versa. + let mut alt_encoder = if encoder.serializer().supports_json() { + Encoder::::new( + LengthDelimitedEncoder::new().into(), + LogfmtSerializer::new().into(), + ) + } else { + Encoder::::new( + NewlineDelimitedEncoder::new().into(), + JsonSerializer::new(MetricTagValues::default()).into(), + ) + }; -impl EventData { - /// Converts this event data into an `Event`. - pub fn into_event(self) -> Event { - match self { - Self::Log(message) => Event::Log(LogEvent::from_bytes_legacy(&message.into())), + alt_encoder + .encode(event, buf) + .expect("should not fail to encode input event"); } } } diff --git a/src/components/validation/resources/http.rs b/src/components/validation/resources/http.rs index 64ca6e04fd4bd..5b88234788381 100644 --- a/src/components/validation/resources/http.rs +++ b/src/components/validation/resources/http.rs @@ -1,5 +1,6 @@ use std::{ collections::VecDeque, + future::Future, net::{IpAddr, SocketAddr}, str::FromStr, sync::Arc, @@ -11,26 +12,18 @@ use axum::{ Router, }; use bytes::BytesMut; -use codecs::{ - encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues, - NewlineDelimitedEncoder, -}; use http::{Method, Request, StatusCode, Uri}; use hyper::{Body, Client, Server}; -use std::future::Future; use tokio::{ select, sync::{mpsc, oneshot, Mutex, Notify}, }; -use tokio_util::codec::{Decoder, Encoder as _}; -use vector_core::event::Event; +use tokio_util::codec::Decoder; -use crate::{ - codecs::Encoder, - components::validation::sync::{Configuring, TaskCoordinator}, -}; +use crate::components::validation::sync::{Configuring, TaskCoordinator}; +use vector_core::event::Event; -use super::{ResourceCodec, ResourceDirection, TestEvent}; +use super::{encode_test_event, ResourceCodec, ResourceDirection, TestEvent}; /// An HTTP resource. #[derive(Clone)] @@ -400,39 +393,3 @@ fn socketaddr_from_uri(uri: &Uri) -> SocketAddr { SocketAddr::from((uri_host, uri_port)) } - -pub fn encode_test_event( - encoder: &mut Encoder, - buf: &mut BytesMut, - event: TestEvent, -) { - match event { - TestEvent::Passthrough(event) => { - // Encode the event normally. - encoder - .encode(event, buf) - .expect("should not fail to encode input event"); - } - TestEvent::Modified { event, .. } => { - // This is a little fragile, but we check what serializer this encoder uses, and based - // on `Serializer::supports_json`, we choose an opposing codec. For example, if the - // encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise - // versa. - let mut alt_encoder = if encoder.serializer().supports_json() { - Encoder::::new( - LengthDelimitedEncoder::new().into(), - LogfmtSerializer::new().into(), - ) - } else { - Encoder::::new( - NewlineDelimitedEncoder::new().into(), - JsonSerializer::new(MetricTagValues::default()).into(), - ) - }; - - alt_encoder - .encode(event, buf) - .expect("should not fail to encode input event"); - } - } -} diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index f382e3b2c6277..f957bedc47954 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -13,8 +13,8 @@ use vector_core::{config::DataType, event::Event}; use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming}; -pub use self::event::{RawTestEvent, TestEvent}; -pub use self::http::{encode_test_event, HttpResourceConfig}; +pub use self::event::{encode_test_event, RawTestEvent, TestEvent}; +pub use self::http::HttpResourceConfig; use super::sync::{Configuring, TaskCoordinator}; diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index d2d483e48bd0a..3f13b20543557 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -2,18 +2,34 @@ pub mod config; mod io; mod telemetry; -use std::{collections::HashMap, path::PathBuf, time::Duration}; +use std::{ + collections::HashMap, + path::PathBuf, + sync::{Arc, Mutex}, + time::Duration, +}; + +use bytes::BytesMut; +use tokio::{ + runtime::Builder, + select, + sync::mpsc::{self, Receiver, Sender}, + task::JoinHandle, +}; +use tokio_util::codec::Encoder as _; -use tokio::{runtime::Builder, select, sync::mpsc}; -use vector_core::event::Event; +use codecs::encoding; +use vector_core::{event::Event, EstimatedJsonEncodedSizeOf}; use crate::{ - components::validation::TestCase, + codecs::Encoder, + components::validation::{RunnerMetrics, TestCase}, config::{ConfigBuilder, ConfigDiff}, topology, }; use super::{ + encode_test_event, sync::{Configuring, TaskCoordinator}, ComponentType, TestCaseExpectation, TestEvent, ValidationConfiguration, Validator, }; @@ -216,6 +232,11 @@ impl Runner { debug!("Component topology configuration built and telemetry collector spawned."); + // Create the data structure that the input and output runners will use to store + // their receivent/sent metrics. This is then shared with the Validator for comparison + // against the actual metrics output by the component under test. + let runner_metrics = Arc::new(Mutex::new(RunnerMetrics::default())); + // After that, we'll build the external resource necessary for this component, if any. // Once that's done, we build the input event/output event sender and receiver based on // whatever we spawned for an external resource. @@ -226,13 +247,13 @@ impl Runner { // For example, if we're validating a source, we would have added a filler sink for our // controlled output edge, which means we then need a server task listening for the // events sent by that sink. - let (runner_input, runner_output) = build_external_resource( + let (runner_input, runner_output, maybe_runner_encoder) = build_external_resource( &self.configuration, &input_task_coordinator, &output_task_coordinator, ); let input_tx = runner_input.into_sender(controlled_edges.input); - let mut output_rx = runner_output.into_receiver(controlled_edges.output); + let output_rx = runner_output.into_receiver(controlled_edges.output); debug!("External resource (if any) and controlled edges built and spawned."); // Now with any external resource spawned, as well as any tasks for handling controlled @@ -273,23 +294,18 @@ impl Runner { // around if we can avoid it. tokio::time::sleep(Duration::from_secs(2)).await; - let input_events = test_case.events.clone(); - let input_driver = tokio::spawn(async move { - for input_event in input_events { - input_tx - .send(input_event) - .await - .expect("input channel should not be closed"); - } - }); + let input_driver = spawn_input_driver( + test_case.events.clone(), + input_tx, + &runner_metrics, + maybe_runner_encoder.as_ref().map(|encoder| encoder.clone()), + ); - let output_driver = tokio::spawn(async move { - let mut output_events = Vec::new(); - while let Some(output_event) = output_rx.recv().await { - output_events.push(output_event); - } - output_events - }); + let output_driver = spawn_output_driver( + output_rx, + &runner_metrics, + maybe_runner_encoder.as_ref().map(|encoder| encoder.clone()), + ); // At this point, the component topology is running, and all input/output/telemetry // tasks are running as well. Our input driver should be sending (or will have already @@ -334,12 +350,12 @@ impl Runner { .values() .map(|validator| { validator.check_validation( - self.configuration.clone(), component_type, expectation, &input_events, &output_events, &telemetry_events, + &runner_metrics.lock().unwrap(), ) }) .collect(); @@ -397,9 +413,12 @@ fn build_external_resource( configuration: &ValidationConfiguration, input_task_coordinator: &TaskCoordinator, output_task_coordinator: &TaskCoordinator, -) -> (RunnerInput, RunnerOutput) { +) -> (RunnerInput, RunnerOutput, Option>) { let component_type = configuration.component_type(); let maybe_external_resource = configuration.external_resource(); + let maybe_encoder = maybe_external_resource + .as_ref() + .map(|resource| resource.codec.into_encoder()); match component_type { ComponentType::Source => { // As an external resource for a source, we create a channel that the validation runner @@ -411,11 +430,15 @@ fn build_external_resource( maybe_external_resource.expect("a source must always have an external resource"); resource.spawn_as_input(rx, input_task_coordinator); - (RunnerInput::External(tx), RunnerOutput::Controlled) + ( + RunnerInput::External(tx), + RunnerOutput::Controlled, + maybe_encoder, + ) } ComponentType::Transform => { // Transforms have no external resources. - (RunnerInput::Controlled, RunnerOutput::Controlled) + (RunnerInput::Controlled, RunnerOutput::Controlled, None) } ComponentType::Sink => { // As an external resource for a sink, we create a channel that the validation runner @@ -427,7 +450,11 @@ fn build_external_resource( maybe_external_resource.expect("a sink must always have an external resource"); resource.spawn_as_output(tx, output_task_coordinator); - (RunnerInput::Controlled, RunnerOutput::External(rx)) + ( + RunnerInput::Controlled, + RunnerOutput::External(rx), + maybe_encoder, + ) } } } @@ -483,6 +510,84 @@ fn spawn_component_topology( }); } +fn spawn_input_driver( + input_events: Vec, + input_tx: Sender, + runner_metrics: &Arc>, + mut maybe_encoder: Option>, +) -> JoinHandle<()> { + let input_runner_metrics = Arc::clone(&runner_metrics); + + tokio::spawn(async move { + for input_event in input_events { + input_tx + .send(input_event.clone()) + .await + .expect("input channel should not be closed"); + + // Update the runner metrics for the sent event. This will later + // be used in the Validators, as the "expected" case. + let mut input_runner_metrics = input_runner_metrics.lock().unwrap(); + + if let Some(mut encoder) = maybe_encoder.as_mut() { + let mut buffer = BytesMut::new(); + encode_test_event(&mut encoder, &mut buffer, input_event.clone()); + + input_runner_metrics.sent_bytes_total += buffer.len() as u64; + } + + let (modified, event) = match input_event { + TestEvent::Passthrough(event) => (false, event), + TestEvent::Modified { modified, event } => (modified, event), + }; + + // account for failure case + if !modified { + input_runner_metrics.sent_event_total += 1; + + input_runner_metrics.sent_event_bytes_total += + vec![event].estimated_json_encoded_size_of().get() as u64; + } + } + }) +} + +fn spawn_output_driver( + mut output_rx: Receiver, + runner_metrics: &Arc>, + maybe_encoder: Option>, +) -> JoinHandle> { + let output_runner_metrics = Arc::clone(&runner_metrics); + + tokio::spawn(async move { + let mut output_events = Vec::new(); + while let Some(output_event) = output_rx.recv().await { + output_events.push(output_event.clone()); + + // Update the runner metrics for the received event. This will later + // be used in the Validators, as the "expected" case. + let mut output_runner_metrics = output_runner_metrics.lock().unwrap(); + + output_runner_metrics.received_events_total += 1; + output_runner_metrics.received_event_bytes_total += vec![output_event.clone()] + .estimated_json_encoded_size_of() + .get() as u64; + + if let Some(encoder) = maybe_encoder.as_ref() { + let mut buffer = BytesMut::new(); + //encode_test_event(&mut encoder, &mut buffer, output_event); + encoder + .clone() + .encode(output_event, &mut buffer) + .expect("should not fail to encode output event"); + + output_runner_metrics.received_bytes_total += buffer.len() as u64; + } + } + output_events + }) +} + fn initialize_test_environment() { // Make sure our metrics recorder is installed and in test mode. This is necessary for // proper internal telemetry collect when running the component topology, even though it's diff --git a/src/components/validation/validators/component_spec/mod.rs b/src/components/validation/validators/component_spec/mod.rs index 88122822f2c0f..36aa2d4452b05 100644 --- a/src/components/validation/validators/component_spec/mod.rs +++ b/src/components/validation/validators/component_spec/mod.rs @@ -2,9 +2,7 @@ mod sources; use vector_core::event::{Event, Metric}; -use crate::components::validation::{ - ComponentType, TestCaseExpectation, TestEvent, ValidationConfiguration, -}; +use crate::components::validation::{ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent}; use super::Validator; @@ -28,12 +26,12 @@ impl Validator for ComponentSpecValidator { fn check_validation( &self, - configuration: ValidationConfiguration, component_type: ComponentType, expectation: TestCaseExpectation, inputs: &[TestEvent], outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { for input in inputs { debug!("Validator observed input event: {:?}", input); @@ -84,13 +82,7 @@ impl Validator for ComponentSpecValidator { format!("received {} telemetry events", telemetry_events.len()), ]; - let out = validate_telemetry( - configuration, - component_type, - inputs, - outputs, - telemetry_events, - )?; + let out = validate_telemetry(component_type, telemetry_events, runner_metrics)?; run_out.extend(out); Ok(run_out) @@ -98,18 +90,16 @@ impl Validator for ComponentSpecValidator { } fn validate_telemetry( - configuration: ValidationConfiguration, component_type: ComponentType, - inputs: &[TestEvent], - outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { let mut out: Vec = Vec::new(); let mut errs: Vec = Vec::new(); match component_type { ComponentType::Source => { - let result = validate_sources(&configuration, inputs, outputs, telemetry_events); + let result = validate_sources(telemetry_events, runner_metrics); match result { Ok(o) => out.extend(o), Err(e) => errs.extend(e), diff --git a/src/components/validation/validators/component_spec/sources.rs b/src/components/validation/validators/component_spec/sources.rs index c6bea95fe9205..6298e241a8cde 100644 --- a/src/components/validation/validators/component_spec/sources.rs +++ b/src/components/validation/validators/component_spec/sources.rs @@ -1,10 +1,8 @@ use std::fmt::{Display, Formatter}; -use bytes::BytesMut; use vector_core::event::{Event, MetricKind}; -use vector_core::EstimatedJsonEncodedSizeOf; -use crate::components::validation::{encode_test_event, TestEvent, ValidationConfiguration}; +use crate::components::validation::RunnerMetrics; use super::filter_events_by_metric_and_component; @@ -37,10 +35,8 @@ impl Display for SourceMetricType { } pub fn validate_sources( - configuration: &ValidationConfiguration, - inputs: &[TestEvent], - outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { let mut out: Vec = Vec::new(); let mut errs: Vec = Vec::new(); @@ -54,7 +50,7 @@ pub fn validate_sources( ]; for v in validations.iter() { - match v(configuration, inputs, outputs, telemetry_events) { + match v(telemetry_events, runner_metrics) { Err(e) => errs.extend(e), Ok(m) => out.extend(m), } @@ -95,40 +91,26 @@ fn sum_counters( } fn validate_events_total( - inputs: &[TestEvent], telemetry_events: &[Event], metric_type: &SourceMetricType, - passthrough: bool, + expected_events: u64, ) -> Result, Vec> { let mut errs: Vec = Vec::new(); let metrics = filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); - let events: i32 = sum_counters(metric_type, &metrics)? as i32; - - let expected_events = inputs.iter().fold(0, |acc, i| { - if passthrough { - if let TestEvent::Passthrough(_) = i { - return acc + 1; - } - } else { - if let TestEvent::Modified { .. } = i { - return acc + 1; - } - } - acc - }); + let actual_events: u64 = sum_counters(metric_type, &metrics)? as u64; debug!( "{}: {} events, {} expected events.", - metric_type, events, expected_events, + metric_type, actual_events, expected_events, ); - if events != expected_events { + if actual_events != expected_events { errs.push(format!( "{}: expected {} events, but received {}", - metric_type, expected_events, events + metric_type, expected_events, actual_events )); } @@ -136,30 +118,30 @@ fn validate_events_total( return Err(errs); } - Ok(vec![format!("{}: {}", metric_type, events,)]) + Ok(vec![format!("{}: {}", metric_type, actual_events)]) } fn validate_bytes_total( telemetry_events: &[Event], metric_type: &SourceMetricType, - expected_bytes: usize, + expected_bytes: u64, ) -> Result, Vec> { let mut errs: Vec = Vec::new(); let metrics = filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); - let metric_bytes = sum_counters(metric_type, &metrics)?; + let actual_bytes: u64 = sum_counters(metric_type, &metrics)? as u64; debug!( "{}: {} bytes, {} expected bytes.", - metric_type, metric_bytes, expected_bytes, + metric_type, actual_bytes, expected_bytes, ); - if metric_bytes != expected_bytes as f64 { + if actual_bytes != expected_bytes { errs.push(format!( "{}: expected {} bytes, but received {}", - metric_type, expected_bytes, metric_bytes + metric_type, expected_bytes, actual_bytes )); } @@ -167,42 +149,31 @@ fn validate_bytes_total( return Err(errs); } - Ok(vec![format!("{}: {}", metric_type, metric_bytes,)]) + Ok(vec![format!("{}: {}", metric_type, actual_bytes)]) } fn validate_component_received_events_total( - _configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { + // The reciprocal metric for events received is events sent, + // so the expected value is what the input runner sent. + let expected_events = runner_metrics.sent_event_total; + validate_events_total( - inputs, telemetry_events, &SourceMetricType::EventsReceived, - true, + expected_events, ) } fn validate_component_received_event_bytes_total( - _configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { - let expected_bytes = inputs.iter().fold(0, |acc, i| { - if let TestEvent::Passthrough(e) = i { - match e { - Event::Log(log_event) => info!("event bytes total. test event: {:?}", log_event), - Event::Metric(_) => todo!(), - Event::Trace(_) => todo!(), - } - let size = vec![e.clone()].estimated_json_encoded_size_of().get(); - return acc + size; - } - - acc - }); + // The reciprocal metric for received_event_bytes is sent_event_bytes, + // so the expected value is what the input runner sent. + let expected_bytes = runner_metrics.sent_event_bytes_total; validate_bytes_total( telemetry_events, @@ -212,31 +183,12 @@ fn validate_component_received_event_bytes_total( } fn validate_component_received_bytes_total( - configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { - let mut expected_bytes = 0; - if let Some(c) = &configuration.external_resource { - let mut encoder = c.codec.into_encoder(); - for i in inputs { - let event = match i { - TestEvent::Passthrough(e) => e, - TestEvent::Modified { modified: _, event } => event, - }; - match event { - Event::Log(log_event) => { - info!(" received bytes total. test event: {:?}", log_event) - } - Event::Metric(_) => todo!(), - Event::Trace(_) => todo!(), - } - let mut buffer = BytesMut::new(); - encode_test_event(&mut encoder, &mut buffer, i.clone()); - expected_bytes += buffer.len() - } - } + // The reciprocal metric for received_bytes is sent_bytes, + // so the expected value is what the input runner sent. + let expected_bytes = runner_metrics.sent_bytes_total; validate_bytes_total( telemetry_events, @@ -246,29 +198,27 @@ fn validate_component_received_bytes_total( } fn validate_component_sent_events_total( - _configuration: &ValidationConfiguration, - inputs: &[TestEvent], - _outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { + // The reciprocal metric for events sent is events received, + // so the expected value is what the output runner received. + let expected_events = runner_metrics.received_events_total; + validate_events_total( - inputs, telemetry_events, &SourceMetricType::SentEventsTotal, - true, + expected_events, ) } fn validate_component_sent_event_bytes_total( - _configuration: &ValidationConfiguration, - _inputs: &[TestEvent], - outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec> { - let mut expected_bytes = 0; - for e in outputs { - expected_bytes += vec![e].estimated_json_encoded_size_of().get(); - } + // The reciprocal metric for sent_event_bytes is received_event_bytes, + // so the expected value is what the output runner received. + let expected_bytes = runner_metrics.received_event_bytes_total; validate_bytes_total( telemetry_events, diff --git a/src/components/validation/validators/mod.rs b/src/components/validation/validators/mod.rs index 8cb4c8945b16b..e563488d7f416 100644 --- a/src/components/validation/validators/mod.rs +++ b/src/components/validation/validators/mod.rs @@ -1,9 +1,10 @@ mod component_spec; + pub use self::component_spec::ComponentSpecValidator; use vector_core::event::Event; -use super::{ComponentType, TestCaseExpectation, TestEvent, ValidationConfiguration}; +use super::{ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent}; /// A component validator. /// @@ -19,12 +20,12 @@ pub trait Validator { /// provided as well. fn check_validation( &self, - configuration: ValidationConfiguration, component_type: ComponentType, expectation: TestCaseExpectation, inputs: &[TestEvent], outputs: &[Event], telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, ) -> Result, Vec>; } From 55a35183c5fde21b61b3fb6ef0aeb5c99171f188 Mon Sep 17 00:00:00 2001 From: neuronull Date: Wed, 12 Jul 2023 11:49:23 -0600 Subject: [PATCH 05/11] cleanup --- src/components/validation/resources/event.rs | 16 +++++++++++++++- src/components/validation/resources/mod.rs | 2 +- src/components/validation/runner/io.rs | 6 +----- src/components/validation/runner/mod.rs | 2 +- src/components/validation/runner/telemetry.rs | 2 +- .../validators/component_spec/sources.rs | 8 ++++---- 6 files changed, 23 insertions(+), 13 deletions(-) diff --git a/src/components/validation/resources/event.rs b/src/components/validation/resources/event.rs index 49bb8420b4944..eb49ee31d1db3 100644 --- a/src/components/validation/resources/event.rs +++ b/src/components/validation/resources/event.rs @@ -10,7 +10,7 @@ use codecs::{ }; use vector_core::event::{Event, LogEvent}; -/// An test case event for deserialization from yaml file. +/// A test case event for deserialization from yaml file. /// This is an intermediary step to TestEvent. #[derive(Clone, Debug, Deserialize)] #[serde(untagged)] @@ -47,6 +47,11 @@ impl EventData { } /// An event used in a test case. +/// It is important to have created the event with all fields, immediately after deserializing from the +/// test case definition yaml file. This ensures that the event data we are using in the expected/actual +/// metrics collection is based on the same event. Namely, one issue that can arise from creating the event +/// from the event data twice (once for the expected and once for actual), it can result in a timestamp in +/// the event which may or may not have the same millisecond precision as it's counterpart. #[derive(Clone, Debug, Deserialize)] #[serde(try_from = "RawTestEvent")] #[serde(untagged)] @@ -66,6 +71,15 @@ pub enum TestEvent { Modified { modified: bool, event: Event }, } +impl TestEvent { + pub fn into_event(self) -> Event { + match self { + Self::Passthrough(event) => event, + Self::Modified { event, .. } => event, + } + } +} + #[derive(Clone, Debug, Eq, PartialEq, Snafu)] pub enum RawTestEventParseError {} diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index f957bedc47954..22e59018da427 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -13,7 +13,7 @@ use vector_core::{config::DataType, event::Event}; use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming}; -pub use self::event::{encode_test_event, RawTestEvent, TestEvent}; +pub use self::event::{encode_test_event, TestEvent}; pub use self::http::HttpResourceConfig; use super::sync::{Configuring, TaskCoordinator}; diff --git a/src/components/validation/runner/io.rs b/src/components/validation/runner/io.rs index d2df83e754f6e..c454ba433c847 100644 --- a/src/components/validation/runner/io.rs +++ b/src/components/validation/runner/io.rs @@ -99,12 +99,8 @@ impl InputEdge { started.mark_as_done(); while let Some(test_event) = rx.recv().await { - let event = match test_event { - TestEvent::Passthrough(e) => e, - TestEvent::Modified { modified: _, event } => event, - }; let request = PushEventsRequest { - events: vec![event.into()], + events: vec![test_event.into_event().into()], }; if let Err(e) = client.push_events(request).await { diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index 3f13b20543557..be0157fea8336 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -233,7 +233,7 @@ impl Runner { debug!("Component topology configuration built and telemetry collector spawned."); // Create the data structure that the input and output runners will use to store - // their receivent/sent metrics. This is then shared with the Validator for comparison + // their received/sent metrics. This is then shared with the Validator for comparison // against the actual metrics output by the component under test. let runner_metrics = Arc::new(Mutex::new(RunnerMetrics::default())); diff --git a/src/components/validation/runner/telemetry.rs b/src/components/validation/runner/telemetry.rs index 3df924af3ea4f..96772f4902c22 100644 --- a/src/components/validation/runner/telemetry.rs +++ b/src/components/validation/runner/telemetry.rs @@ -146,7 +146,7 @@ impl Telemetry { } } if events_seen != SHUTDOWN_TICKS { - panic!("did not receive {SHUTDOWN_TICKS} events while waiting for shutdown! found {events_seen}"); + panic!("did not receive {SHUTDOWN_TICKS} events while waiting for shutdown! Only found {events_seen}!"); } break 'outer; }, diff --git a/src/components/validation/validators/component_spec/sources.rs b/src/components/validation/validators/component_spec/sources.rs index 6298e241a8cde..04b8aa138dd98 100644 --- a/src/components/validation/validators/component_spec/sources.rs +++ b/src/components/validation/validators/component_spec/sources.rs @@ -66,7 +66,7 @@ pub fn validate_sources( fn sum_counters( metric_name: &SourceMetricType, metrics: &[&vector_core::event::Metric], -) -> Result> { +) -> Result> { let mut sum: f64 = 0.0; let mut errs = Vec::new(); @@ -84,7 +84,7 @@ fn sum_counters( } if errs.is_empty() { - Ok(sum) + Ok(sum as u64) } else { Err(errs) } @@ -100,7 +100,7 @@ fn validate_events_total( let metrics = filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); - let actual_events: u64 = sum_counters(metric_type, &metrics)? as u64; + let actual_events = sum_counters(metric_type, &metrics)?; debug!( "{}: {} events, {} expected events.", @@ -131,7 +131,7 @@ fn validate_bytes_total( let metrics = filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); - let actual_bytes: u64 = sum_counters(metric_type, &metrics)? as u64; + let actual_bytes = sum_counters(metric_type, &metrics)?; debug!( "{}: {} bytes, {} expected bytes.", From 8ec87b3a25d87707f112538ffef1108d9857e922 Mon Sep 17 00:00:00 2001 From: neuronull Date: Wed, 12 Jul 2023 11:54:08 -0600 Subject: [PATCH 06/11] cleanup --- src/components/validation/runner/mod.rs | 1 - src/components/validation/runner/telemetry.rs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index be0157fea8336..73beba89bf1a5 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -575,7 +575,6 @@ fn spawn_output_driver( if let Some(encoder) = maybe_encoder.as_ref() { let mut buffer = BytesMut::new(); - //encode_test_event(&mut encoder, &mut buffer, output_event); encoder .clone() .encode(output_event, &mut buffer) diff --git a/src/components/validation/runner/telemetry.rs b/src/components/validation/runner/telemetry.rs index 96772f4902c22..7238b39160628 100644 --- a/src/components/validation/runner/telemetry.rs +++ b/src/components/validation/runner/telemetry.rs @@ -146,7 +146,7 @@ impl Telemetry { } } if events_seen != SHUTDOWN_TICKS { - panic!("did not receive {SHUTDOWN_TICKS} events while waiting for shutdown! Only found {events_seen}!"); + panic!("Did not receive {SHUTDOWN_TICKS} events while waiting for shutdown! Only received {events_seen}!"); } break 'outer; }, From 4b3b721ea7e3732f0cd75a5d1f1c3af76d04f5fa Mon Sep 17 00:00:00 2001 From: neuronull Date: Wed, 12 Jul 2023 12:14:22 -0600 Subject: [PATCH 07/11] clippy --- src/components/validation/resources/event.rs | 1 + src/components/validation/runner/mod.rs | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/components/validation/resources/event.rs b/src/components/validation/resources/event.rs index eb49ee31d1db3..beb9c244c2fbd 100644 --- a/src/components/validation/resources/event.rs +++ b/src/components/validation/resources/event.rs @@ -72,6 +72,7 @@ pub enum TestEvent { } impl TestEvent { + #[allow(clippy::missing_const_for_fn)] // const cannot run destructor pub fn into_event(self) -> Event { match self { Self::Passthrough(event) => event, diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index 73beba89bf1a5..d52a15926b8a0 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -298,13 +298,13 @@ impl Runner { test_case.events.clone(), input_tx, &runner_metrics, - maybe_runner_encoder.as_ref().map(|encoder| encoder.clone()), + maybe_runner_encoder.as_ref().cloned(), ); let output_driver = spawn_output_driver( output_rx, &runner_metrics, - maybe_runner_encoder.as_ref().map(|encoder| encoder.clone()), + maybe_runner_encoder.as_ref().cloned(), ); // At this point, the component topology is running, and all input/output/telemetry @@ -516,7 +516,7 @@ fn spawn_input_driver( runner_metrics: &Arc>, mut maybe_encoder: Option>, ) -> JoinHandle<()> { - let input_runner_metrics = Arc::clone(&runner_metrics); + let input_runner_metrics = Arc::clone(runner_metrics); tokio::spawn(async move { for input_event in input_events { @@ -529,9 +529,9 @@ fn spawn_input_driver( // be used in the Validators, as the "expected" case. let mut input_runner_metrics = input_runner_metrics.lock().unwrap(); - if let Some(mut encoder) = maybe_encoder.as_mut() { + if let Some(encoder) = maybe_encoder.as_mut() { let mut buffer = BytesMut::new(); - encode_test_event(&mut encoder, &mut buffer, input_event.clone()); + encode_test_event(encoder, &mut buffer, input_event.clone()); input_runner_metrics.sent_bytes_total += buffer.len() as u64; } @@ -557,7 +557,7 @@ fn spawn_output_driver( runner_metrics: &Arc>, maybe_encoder: Option>, ) -> JoinHandle> { - let output_runner_metrics = Arc::clone(&runner_metrics); + let output_runner_metrics = Arc::clone(runner_metrics); tokio::spawn(async move { let mut output_events = Vec::new(); From f9854bf53cd59f4233b6b02cb0604abec449ae7a Mon Sep 17 00:00:00 2001 From: neuronull Date: Wed, 12 Jul 2023 14:09:34 -0600 Subject: [PATCH 08/11] feedback tz: sent_eventssssss --- src/components/validation/mod.rs | 2 +- src/components/validation/runner/mod.rs | 2 +- src/components/validation/validators/component_spec/sources.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/components/validation/mod.rs b/src/components/validation/mod.rs index dd335140b97f5..90723ff416a50 100644 --- a/src/components/validation/mod.rs +++ b/src/components/validation/mod.rs @@ -180,7 +180,7 @@ pub struct RunnerMetrics { pub received_bytes_total: u64, pub sent_bytes_total: u64, // a reciprocal for received_bytes_total pub sent_event_bytes_total: u64, - pub sent_event_total: u64, + pub sent_events_total: u64, } #[cfg(all(test, feature = "component-validation-tests"))] diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index d52a15926b8a0..7c4024e6dda22 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -543,7 +543,7 @@ fn spawn_input_driver( // account for failure case if !modified { - input_runner_metrics.sent_event_total += 1; + input_runner_metrics.sent_events_total += 1; input_runner_metrics.sent_event_bytes_total += vec![event].estimated_json_encoded_size_of().get() as u64; diff --git a/src/components/validation/validators/component_spec/sources.rs b/src/components/validation/validators/component_spec/sources.rs index 04b8aa138dd98..6c1a3dca88399 100644 --- a/src/components/validation/validators/component_spec/sources.rs +++ b/src/components/validation/validators/component_spec/sources.rs @@ -158,7 +158,7 @@ fn validate_component_received_events_total( ) -> Result, Vec> { // The reciprocal metric for events received is events sent, // so the expected value is what the input runner sent. - let expected_events = runner_metrics.sent_event_total; + let expected_events = runner_metrics.sent_events_total; validate_events_total( telemetry_events, From 0577ee62f4b7b06d4d1f07e0adcdf25715a89a87 Mon Sep 17 00:00:00 2001 From: neuronull Date: Wed, 12 Jul 2023 14:57:44 -0600 Subject: [PATCH 09/11] feedback tz: fix telemetry shutdown finishing logic --- src/components/validation/resources/http.rs | 16 ++++---- src/components/validation/resources/mod.rs | 2 +- src/components/validation/runner/io.rs | 31 +++++++------- src/components/validation/runner/mod.rs | 41 ++++++++++--------- src/components/validation/runner/telemetry.rs | 41 +++++++------------ 5 files changed, 61 insertions(+), 70 deletions(-) diff --git a/src/components/validation/resources/http.rs b/src/components/validation/resources/http.rs index 5b88234788381..44e2b7ba7579b 100644 --- a/src/components/validation/resources/http.rs +++ b/src/components/validation/resources/http.rs @@ -60,7 +60,7 @@ impl HttpResourceConfig { self, direction: ResourceDirection, codec: ResourceCodec, - output_tx: mpsc::Sender, + output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, ) { match direction { @@ -223,7 +223,7 @@ fn spawn_input_http_client( fn spawn_output_http_server( config: HttpResourceConfig, codec: ResourceCodec, - output_tx: mpsc::Sender, + output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, ) { // This HTTP server will wait for events to be sent by a sink, and collect them and send them on @@ -245,12 +245,10 @@ fn spawn_output_http_server( loop { match decoder.decode_eof(&mut body) { Ok(Some((events, _byte_size))) => { - for event in events { - output_tx - .send(event) - .await - .expect("should not fail to send output event"); - } + output_tx + .send(events.to_vec()) + .await + .expect("should not fail to send output event"); } Ok(None) => return StatusCode::OK.into_response(), Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(), @@ -283,7 +281,7 @@ fn spawn_output_http_server( fn spawn_output_http_client( _config: HttpResourceConfig, _codec: ResourceCodec, - _output_tx: mpsc::Sender, + _output_tx: mpsc::Sender>, _task_coordinator: &TaskCoordinator, ) { // TODO: The `prometheus_exporter` sink is the only sink that exposes an HTTP server which must be diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 22e59018da427..a22d6fc324dbd 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -308,7 +308,7 @@ impl ExternalResource { /// Spawns this resource for use as an output for a sink. pub fn spawn_as_output( self, - output_tx: mpsc::Sender, + output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, ) { match self.definition { diff --git a/src/components/validation/runner/io.rs b/src/components/validation/runner/io.rs index c454ba433c847..55e4fca1eaecf 100644 --- a/src/components/validation/runner/io.rs +++ b/src/components/validation/runner/io.rs @@ -27,11 +27,11 @@ use crate::{ #[derive(Clone)] pub struct EventForwardService { - tx: mpsc::Sender, + tx: mpsc::Sender>, } -impl From> for EventForwardService { - fn from(tx: mpsc::Sender) -> Self { +impl From>> for EventForwardService { + fn from(tx: mpsc::Sender>) -> Self { Self { tx } } } @@ -42,14 +42,17 @@ impl VectorService for EventForwardService { &self, request: tonic::Request, ) -> Result, Status> { - let events = request.into_inner().events.into_iter().map(Event::from); - - for event in events { - self.tx - .send(event) - .await - .expect("event forward rx should not close first"); - } + let events = request + .into_inner() + .events + .into_iter() + .map(Event::from) + .collect(); + + self.tx + .send(events) + .await + .expect("event forward rx should not close first"); Ok(tonic::Response::new(PushEventsResponse {})) } @@ -74,7 +77,7 @@ pub struct InputEdge { pub struct OutputEdge { listen_addr: GrpcAddress, service: VectorServer, - rx: mpsc::Receiver, + rx: mpsc::Receiver>, } impl InputEdge { @@ -129,7 +132,7 @@ impl OutputEdge { pub fn spawn_output_server( self, task_coordinator: &TaskCoordinator, - ) -> mpsc::Receiver { + ) -> mpsc::Receiver> { spawn_grpc_server(self.listen_addr, self.service, task_coordinator); self.rx } @@ -184,5 +187,5 @@ pub fn spawn_grpc_server( pub struct ControlledEdges { pub input: Option>, - pub output: Option>, + pub output: Option>>, } diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index 7c4024e6dda22..b4bcd72de75bf 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -89,7 +89,7 @@ pub enum RunnerOutput { /// external resource pulls output events from the sink. /// /// Only sinks have external inputs. - External(mpsc::Receiver), + External(mpsc::Receiver>), /// The component uses a "controlled" edge for its output. /// @@ -109,8 +109,8 @@ impl RunnerOutput { /// this function will panic, as one or the other must be provided. pub fn into_receiver( self, - controlled_edge: Option>, - ) -> mpsc::Receiver { + controlled_edge: Option>>, + ) -> mpsc::Receiver> { match (self, controlled_edge) { (Self::External(_), Some(_)) => panic!("Runner output declared as external resource, but controlled output edge was also specified."), (Self::Controlled, None) => panic!("Runner output declared as controlled, but no controlled output edge was specified."), @@ -553,7 +553,7 @@ fn spawn_input_driver( } fn spawn_output_driver( - mut output_rx: Receiver, + mut output_rx: Receiver>, runner_metrics: &Arc>, maybe_encoder: Option>, ) -> JoinHandle> { @@ -561,26 +561,29 @@ fn spawn_output_driver( tokio::spawn(async move { let mut output_events = Vec::new(); - while let Some(output_event) = output_rx.recv().await { - output_events.push(output_event.clone()); + while let Some(events) = output_rx.recv().await { + output_events.extend(events.clone()); // Update the runner metrics for the received event. This will later // be used in the Validators, as the "expected" case. let mut output_runner_metrics = output_runner_metrics.lock().unwrap(); - output_runner_metrics.received_events_total += 1; - output_runner_metrics.received_event_bytes_total += vec![output_event.clone()] - .estimated_json_encoded_size_of() - .get() as u64; - - if let Some(encoder) = maybe_encoder.as_ref() { - let mut buffer = BytesMut::new(); - encoder - .clone() - .encode(output_event, &mut buffer) - .expect("should not fail to encode output event"); - - output_runner_metrics.received_bytes_total += buffer.len() as u64; + for output_event in events { + output_runner_metrics.received_events_total += 1; + output_runner_metrics.received_event_bytes_total += vec![output_event.clone()] + .estimated_json_encoded_size_of() + .get() + as u64; + + if let Some(encoder) = maybe_encoder.as_ref() { + let mut buffer = BytesMut::new(); + encoder + .clone() + .encode(output_event, &mut buffer) + .expect("should not fail to encode output event"); + + output_runner_metrics.received_bytes_total += buffer.len() as u64; + } } } output_events diff --git a/src/components/validation/runner/telemetry.rs b/src/components/validation/runner/telemetry.rs index 7238b39160628..f69dc40217a92 100644 --- a/src/components/validation/runner/telemetry.rs +++ b/src/components/validation/runner/telemetry.rs @@ -23,16 +23,13 @@ const INTERNAL_LOGS_KEY: &str = "_telemetry_logs"; const INTERNAL_METRICS_KEY: &str = "_telemetry_metrics"; const VECTOR_SINK_KEY: &str = "_telemetry_out"; -const SHUTDOWN_TICKS: u8 = 3; - -// The metrics event to monitor for before shutting down a telemetry collector. -const INTERNAL_METRICS_SHUTDOWN_EVENT: &str = "component_received_events_total"; +const SHUTDOWN_TICKS: u8 = 2; /// Telemetry collector for a component under validation. pub struct Telemetry { listen_addr: GrpcAddress, service: VectorServer, - rx: mpsc::Receiver, + rx: mpsc::Receiver>, } impl Telemetry { @@ -100,20 +97,19 @@ impl Telemetry { select! { _ = telemetry_shutdown_handle.wait() => { // After we receive the shutdown signal, we need to wait - // for two event emissions from the internal_metrics + // for two batches of event emissions from the internal_metrics // source. This is to ensure that we've received all the // events from the components that we're testing. // // We need exactly two because the internal_metrics // source does not emit component events until after the // component_received_events_total metric has been - // emitted. Thus, two events ensure that all component + // emitted. Thus, two batches ensure that all component // events have been emitted. debug!("Telemetry: waiting for final internal_metrics events before shutting down."); - let mut events_seen = 0; - let current_time = chrono::Utc::now(); + let mut batches_received = 0; let timeout = tokio::time::sleep(Duration::from_secs(5)); tokio::pin!(timeout); @@ -123,21 +119,12 @@ impl Telemetry { d = rx.recv() => { match d { None => break, - Some(telemetry_event) => { - telemetry_events.push(telemetry_event.clone()); - if let Event::Metric(metric) = telemetry_event { - if let Some(tags) = metric.tags() { - if metric.name() == INTERNAL_METRICS_SHUTDOWN_EVENT && - tags.get("component_name") == Some(INTERNAL_LOGS_KEY) && - metric.data().timestamp().unwrap() > ¤t_time { - debug!("Telemetry: processed one component_received_events_total event."); - - events_seen += 1; - if events_seen == SHUTDOWN_TICKS { - break; - } - } - } + Some(telemetry_event_batch) => { + telemetry_events.extend(telemetry_event_batch); + debug!("Telemetry: processed one batch of internal_metrics."); + batches_received += 1; + if batches_received == SHUTDOWN_TICKS { + break; } } } @@ -145,14 +132,14 @@ impl Telemetry { _ = &mut timeout => break, } } - if events_seen != SHUTDOWN_TICKS { - panic!("Did not receive {SHUTDOWN_TICKS} events while waiting for shutdown! Only received {events_seen}!"); + if batches_received != SHUTDOWN_TICKS { + panic!("Did not receive {SHUTDOWN_TICKS} events while waiting for shutdown! Only received {batches_received}!"); } break 'outer; }, maybe_telemetry_event = rx.recv() => match maybe_telemetry_event { None => break, - Some(telemetry_event) => telemetry_events.push(telemetry_event), + Some(telemetry_event_batch) => telemetry_events.extend(telemetry_event_batch), }, } } From 51e9ab47ce1d86aba933a2946c84b8ea141352ac Mon Sep 17 00:00:00 2001 From: neuronull Date: Thu, 13 Jul 2023 14:01:10 -0600 Subject: [PATCH 10/11] 3 ticks --- src/components/validation/runner/telemetry.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/components/validation/runner/telemetry.rs b/src/components/validation/runner/telemetry.rs index f69dc40217a92..76e8c29f62a9f 100644 --- a/src/components/validation/runner/telemetry.rs +++ b/src/components/validation/runner/telemetry.rs @@ -23,7 +23,7 @@ const INTERNAL_LOGS_KEY: &str = "_telemetry_logs"; const INTERNAL_METRICS_KEY: &str = "_telemetry_metrics"; const VECTOR_SINK_KEY: &str = "_telemetry_out"; -const SHUTDOWN_TICKS: u8 = 2; +const SHUTDOWN_TICKS: u8 = 3; /// Telemetry collector for a component under validation. pub struct Telemetry { From b7a7bd303ccf1e1976bb1d5f4c899cf3278a5397 Mon Sep 17 00:00:00 2001 From: neuronull Date: Wed, 19 Jul 2023 09:39:41 -0600 Subject: [PATCH 11/11] feedback tz- from not try_from --- src/components/validation/resources/event.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/components/validation/resources/event.rs b/src/components/validation/resources/event.rs index beb9c244c2fbd..1f91acfbe660d 100644 --- a/src/components/validation/resources/event.rs +++ b/src/components/validation/resources/event.rs @@ -53,7 +53,7 @@ impl EventData { /// from the event data twice (once for the expected and once for actual), it can result in a timestamp in /// the event which may or may not have the same millisecond precision as it's counterpart. #[derive(Clone, Debug, Deserialize)] -#[serde(try_from = "RawTestEvent")] +#[serde(from = "RawTestEvent")] #[serde(untagged)] pub enum TestEvent { /// The event is used, as-is, without modification. @@ -84,11 +84,9 @@ impl TestEvent { #[derive(Clone, Debug, Eq, PartialEq, Snafu)] pub enum RawTestEventParseError {} -impl TryFrom for TestEvent { - type Error = RawTestEventParseError; - - fn try_from(other: RawTestEvent) -> Result { - Ok(match other { +impl From for TestEvent { + fn from(other: RawTestEvent) -> Self { + match other { RawTestEvent::Passthrough(event_data) => { TestEvent::Passthrough(event_data.into_event()) } @@ -96,7 +94,7 @@ impl TryFrom for TestEvent { modified, event: event.into_event(), }, - }) + } } }