diff --git a/src/components/validation/mod.rs b/src/components/validation/mod.rs index 37f5d6668cf18..f1ef68609e59c 100644 --- a/src/components/validation/mod.rs +++ b/src/components/validation/mod.rs @@ -14,6 +14,14 @@ pub use self::runner::*; pub use self::test_case::{TestCase, TestCaseExpectation}; pub use self::validators::*; +pub mod component_names { + pub const TEST_SOURCE_NAME: &str = "test_source"; + pub const TEST_SINK_NAME: &str = "test_sink"; + pub const TEST_TRANSFORM_NAME: &str = "test_transform"; + pub const TEST_INPUT_SOURCE_NAME: &str = "input_source"; + pub const TEST_OUTPUT_SINK_NAME: &str = "output_sink"; +} + /// Component types that can be validated. // TODO: We should centralize this in `vector-common` or something, where both this code and the // configuration schema stuff (namely the proc macros that use this) can share it. @@ -173,15 +181,16 @@ macro_rules! register_validatable_component { /// Input and Output runners populate this structure as they send and receive events. /// The structure is passed into the validator to use as the expected values for the /// metrics that the components under test actually output. -#[derive(Default)] +#[derive(Default, Debug)] pub struct RunnerMetrics { pub received_events_total: u64, pub received_event_bytes_total: u64, pub received_bytes_total: u64, - pub sent_bytes_total: u64, // a reciprocal for received_bytes_total + pub sent_bytes_total: u64, pub sent_event_bytes_total: u64, pub sent_events_total: u64, pub errors_total: u64, + pub discarded_events_total: u64, } #[cfg(all(test, feature = "component-validation-tests"))] diff --git a/src/components/validation/resources/event.rs b/src/components/validation/resources/event.rs index 4bce09acd4ce5..96870f6b773af 100644 --- a/src/components/validation/resources/event.rs +++ b/src/components/validation/resources/event.rs @@ -79,6 +79,20 @@ impl TestEvent { Self::Modified { event, .. } => event, } } + + pub fn get_event(&mut self) -> &mut Event { + match self { + Self::Passthrough(event) => event, + Self::Modified { event, .. } => event, + } + } + + pub fn get(self) -> (bool, Event) { + match self { + Self::Passthrough(event) => (false, event), + Self::Modified { modified, event } => (modified, event), + } + } } #[derive(Clone, Debug, Eq, PartialEq, Snafu)] diff --git a/src/components/validation/resources/http.rs b/src/components/validation/resources/http.rs index 052dabec52cf8..b2fb003ff688f 100644 --- a/src/components/validation/resources/http.rs +++ b/src/components/validation/resources/http.rs @@ -20,8 +20,11 @@ use tokio::{ }; use tokio_util::codec::Decoder; -use crate::components::validation::sync::{Configuring, TaskCoordinator}; -use vector_lib::event::Event; +use crate::components::validation::{ + sync::{Configuring, TaskCoordinator}, + RunnerMetrics, +}; +use vector_lib::{event::Event, EstimatedJsonEncodedSizeOf}; use super::{encode_test_event, ResourceCodec, ResourceDirection, TestEvent}; @@ -43,11 +46,12 @@ impl HttpResourceConfig { codec: ResourceCodec, input_rx: mpsc::Receiver, task_coordinator: &TaskCoordinator, + runner_metrics: &Arc>, ) { match direction { // The source will pull data from us. ResourceDirection::Pull => { - spawn_input_http_server(self, codec, input_rx, task_coordinator) + spawn_input_http_server(self, codec, input_rx, task_coordinator, runner_metrics) } // We'll push data to the source. ResourceDirection::Push => { @@ -62,6 +66,7 @@ impl HttpResourceConfig { codec: ResourceCodec, output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, + runner_metrics: &Arc>, ) -> vector_lib::Result<()> { match direction { // We'll pull data from the sink. @@ -73,7 +78,7 @@ impl HttpResourceConfig { )), // The sink will push data to us. ResourceDirection::Push => { - spawn_output_http_server(self, codec, output_tx, task_coordinator) + spawn_output_http_server(self, codec, output_tx, task_coordinator, runner_metrics) } } } @@ -86,6 +91,7 @@ fn spawn_input_http_server( codec: ResourceCodec, mut input_rx: mpsc::Receiver, task_coordinator: &TaskCoordinator, + runner_metrics: &Arc>, ) { // This HTTP server will poll the input receiver for input events and buffer them. When a // request comes in on the right path/method, one buffered input event will be sent back. If no @@ -97,8 +103,11 @@ fn spawn_input_http_server( let encoder = codec.into_encoder(); let sendable_events = Arc::clone(&outstanding_events); - let (resource_notifier, http_server_shutdown_tx) = - spawn_http_server(task_coordinator, &config, move |_| { + let (resource_notifier, http_server_shutdown_tx) = spawn_http_server( + task_coordinator, + &config, + runner_metrics, + move |_request, _runner_metrics| { let sendable_events = Arc::clone(&sendable_events); let mut encoder = encoder.clone(); @@ -116,12 +125,14 @@ fn spawn_input_http_server( StatusCode::OK.into_response() } } - }); + }, + ); // Now we'll create and spawn the resource's core logic loop which drives the buffering of input // events and working with the HTTP server as they're consumed. let resource_started = task_coordinator.track_started(); let resource_completed = task_coordinator.track_completed(); + tokio::spawn(async move { resource_started.mark_as_done(); debug!("HTTP server external input resource started."); @@ -140,7 +151,10 @@ fn spawn_input_http_server( let mut outstanding_events = outstanding_events.lock().await; outstanding_events.push_back(event); }, - None => input_finished = true, + None => { + trace!("HTTP server external input resource input is finished."); + input_finished = true; + }, }, _ = resource_notifier.notified() => { @@ -159,10 +173,15 @@ fn spawn_input_http_server( }, } } - // Mark ourselves as completed now that we've sent all inputs to the source, and // additionally signal the HTTP server to also gracefully shutdown. _ = http_server_shutdown_tx.send(()); + + // TODO - currently we are getting lucky in the testing of `http_client` source... if the source tries to query + // this server but we have already shut down the thread, then it will generate an error which can throw off our error + // validation. + // I think the solution involves adding synchronization to wait here for the runner to tell us to shutdown. + resource_completed.mark_as_done(); debug!("HTTP server external input resource completed."); @@ -230,6 +249,7 @@ fn spawn_output_http_server( codec: ResourceCodec, output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, + runner_metrics: &Arc>, ) -> vector_lib::Result<()> { // This HTTP server will wait for events to be sent by a sink, and collect them and send them on // via an output sender. We accept/collect events until we're told to shutdown. @@ -237,8 +257,11 @@ fn spawn_output_http_server( // First, we'll build and spawn our HTTP server. let decoder = codec.into_decoder()?; - let (_, http_server_shutdown_tx) = - spawn_http_server(task_coordinator, &config, move |request| { + let (_, http_server_shutdown_tx) = spawn_http_server( + task_coordinator, + &config, + runner_metrics, + move |request, output_runner_metrics| { let output_tx = output_tx.clone(); let mut decoder = decoder.clone(); @@ -249,7 +272,23 @@ fn spawn_output_http_server( let mut body = BytesMut::from(&body[..]); loop { match decoder.decode_eof(&mut body) { - Ok(Some((events, _byte_size))) => { + Ok(Some((events, byte_size))) => { + let mut output_runner_metrics = + output_runner_metrics.lock().await; + debug!("HTTP server external output resource decoded {byte_size} bytes."); + + // Update the runner metrics for the received events. This will later + // be used in the Validators, as the "expected" case. + output_runner_metrics.received_bytes_total += byte_size as u64; + + output_runner_metrics.received_events_total += + events.len() as u64; + + events.iter().for_each(|event| { + output_runner_metrics.received_event_bytes_total += + event.estimated_json_encoded_size_of().get() as u64; + }); + output_tx .send(events.to_vec()) .await @@ -262,13 +301,15 @@ fn spawn_output_http_server( } } } - }); + }, + ); // Now we'll create and spawn the resource's core logic loop which simply waits for the runner // to instruct us to shutdown, and when that happens, cascades to shutting down the HTTP server. let resource_started = task_coordinator.track_started(); let resource_completed = task_coordinator.track_completed(); let mut resource_shutdown_rx = task_coordinator.register_for_shutdown(); + tokio::spawn(async move { resource_started.mark_as_done(); debug!("HTTP server external output resource started."); @@ -299,10 +340,11 @@ fn spawn_output_http_client( fn spawn_http_server( task_coordinator: &TaskCoordinator, config: &HttpResourceConfig, + runner_metrics: &Arc>, handler: H, ) -> (Arc, oneshot::Sender<()>) where - H: Fn(Request) -> F + Clone + Send + 'static, + H: Fn(Request, Arc>) -> F + Clone + Send + 'static, F: Future + Send, R: IntoResponse, { @@ -327,6 +369,8 @@ where let resource_notifier = Arc::new(Notify::new()); let server_notifier = Arc::clone(&resource_notifier); + let output_runner_metrics = Arc::clone(runner_metrics); + tokio::spawn(async move { // Create our HTTP server by binding as early as possible to return an error if we can't // actually bind. @@ -353,7 +397,7 @@ where StatusCode::METHOD_NOT_ALLOWED }) .on(method_filter, move |request: Request| { - let request_handler = handler(request); + let request_handler = handler(request, output_runner_metrics); let notifier = Arc::clone(&server_notifier); async move { diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 61ae25d02435c..04c90a8bbeaf2 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -1,7 +1,9 @@ mod event; mod http; -use tokio::sync::mpsc; +use std::sync::Arc; + +use tokio::sync::{mpsc, Mutex}; use vector_lib::codecs::{ decoding::{self, DeserializerConfig}, encoding::{ @@ -16,7 +18,10 @@ use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingCo pub use self::event::{encode_test_event, TestEvent}; pub use self::http::HttpResourceConfig; -use super::sync::{Configuring, TaskCoordinator}; +use super::{ + sync::{Configuring, TaskCoordinator}, + RunnerMetrics, +}; /// The codec used by the external resource. /// @@ -292,7 +297,7 @@ impl From for ResourceDefinition { /// the external resource must pull the data from the sink. #[derive(Clone)] pub struct ExternalResource { - direction: ResourceDirection, + pub direction: ResourceDirection, definition: ResourceDefinition, pub codec: ResourceCodec, } @@ -316,11 +321,16 @@ impl ExternalResource { self, input_rx: mpsc::Receiver, task_coordinator: &TaskCoordinator, + runner_metrics: &Arc>, ) { match self.definition { - ResourceDefinition::Http(http_config) => { - http_config.spawn_as_input(self.direction, self.codec, input_rx, task_coordinator) - } + ResourceDefinition::Http(http_config) => http_config.spawn_as_input( + self.direction, + self.codec, + input_rx, + task_coordinator, + runner_metrics, + ), } } @@ -329,11 +339,16 @@ impl ExternalResource { self, output_tx: mpsc::Sender>, task_coordinator: &TaskCoordinator, + runner_metrics: &Arc>, ) -> vector_lib::Result<()> { match self.definition { - ResourceDefinition::Http(http_config) => { - http_config.spawn_as_output(self.direction, self.codec, output_tx, task_coordinator) - } + ResourceDefinition::Http(http_config) => http_config.spawn_as_output( + self.direction, + self.codec, + output_tx, + task_coordinator, + runner_metrics, + ), } } } diff --git a/src/components/validation/runner/config.rs b/src/components/validation/runner/config.rs index f80746ab21df1..ed721fea929f0 100644 --- a/src/components/validation/runner/config.rs +++ b/src/components/validation/runner/config.rs @@ -1,5 +1,6 @@ use crate::{ components::validation::{ + component_names::*, sync::{Configuring, TaskCoordinator}, util::GrpcAddress, ComponentConfiguration, ComponentType, ValidationConfiguration, @@ -21,12 +22,6 @@ pub struct TopologyBuilder { output_edge: Option, } -pub const TEST_SOURCE_NAME: &str = "test_source"; -pub const TEST_SINK_NAME: &str = "test_sink"; -pub const TEST_TRANSFORM_NAME: &str = "test_transform"; -pub const TEST_INPUT_SOURCE_NAME: &str = "input_source"; -pub const TEST_OUTPUT_SINK_NAME: &str = "output_sink"; - impl TopologyBuilder { /// Creates a component topology for the given component configuration. pub fn from_configuration(configuration: &ValidationConfiguration) -> Self { diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index a01ed4f13e495..5014edc5aaa40 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -2,24 +2,24 @@ pub mod config; mod io; mod telemetry; -use std::{ - collections::HashMap, - path::PathBuf, - sync::{Arc, Mutex}, - time::Duration, -}; +use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use bytes::BytesMut; +use chrono::Utc; use tokio::{ runtime::Builder, select, - sync::mpsc::{self, Receiver, Sender}, + sync::{ + mpsc::{self, Receiver, Sender}, + Mutex, + }, task::JoinHandle, }; use tokio_util::codec::Encoder as _; -use vector_lib::codecs::encoding; -use vector_lib::{event::Event, EstimatedJsonEncodedSizeOf}; +use vector_lib::{ + codecs::encoding, config::LogNamespace, event::Event, EstimatedJsonEncodedSizeOf, +}; use crate::{ codecs::Encoder, @@ -234,7 +234,7 @@ impl Runner { ) .await; - debug!("Component topology configuration built and telemetry collector spawned."); + info!("Component topology configuration built and telemetry collector spawned."); // Create the data structure that the input and output runners will use to store // their received/sent metrics. This is then shared with the Validator for comparison @@ -255,22 +255,23 @@ impl Runner { &self.configuration, &input_task_coordinator, &output_task_coordinator, + &runner_metrics, )?; let input_tx = runner_input.into_sender(controlled_edges.input); let output_rx = runner_output.into_receiver(controlled_edges.output); - debug!("External resource (if any) and controlled edges built and spawned."); + info!("External resource (if any) and controlled edges built and spawned."); // Now with any external resource spawned, as well as any tasks for handling controlled // edges, we'll wait for all of those tasks to report that they're ready to go and // listening, etc. let input_task_coordinator = input_task_coordinator.started().await; - debug!("All input task(s) started."); + info!("All input task(s) started."); let telemetry_task_coordinator = telemetry_task_coordinator.started().await; - debug!("All telemetry task(s) started."); + info!("All telemetry task(s) started."); let output_task_coordinator = output_task_coordinator.started().await; - debug!("All output task(s) started."); + info!("All output task(s) started."); // At this point, we need to actually spawn the configured component topology so that it // runs, and make sure we have a way to tell it when to shutdown so that we can properly @@ -307,12 +308,14 @@ impl Runner { input_tx, &runner_metrics, maybe_runner_encoder.as_ref().cloned(), + self.configuration.component_type == ComponentType::Source, ); let output_driver = spawn_output_driver( output_rx, &runner_metrics, maybe_runner_encoder.as_ref().cloned(), + self.configuration.component_type == ComponentType::Sink, ); // At this point, the component topology is running, and all input/output/telemetry @@ -329,20 +332,31 @@ impl Runner { .expect("input driver task should not have panicked"); input_task_coordinator.shutdown().await; - debug!("Input task(s) have been shutdown."); + info!("Input task(s) have been shutdown."); + + // Without this, not all internal metric events are received for sink components under test. + // TODO: This is awful and needs a proper solution. + // I think we are going to need to setup distinct task sync logic potentially for each + // combination of Source/Sink + Resource direction Push/Pull + if self.configuration.component_type == ComponentType::Sink { + tokio::time::sleep(Duration::from_secs(1)).await; + } telemetry_task_coordinator.shutdown().await; - debug!("Telemetry task(s) have been shutdown."); + info!("Telemetry task(s) have been shutdown."); topology_task_coordinator.shutdown().await; - debug!("Component topology task has been shutdown."); + info!("Component topology task has been shutdown."); output_task_coordinator.shutdown().await; - debug!("Output task(s) have been shutdown."); + info!("Output task(s) have been shutdown."); let output_events = output_driver .await - .expect("input driver task should not have panicked"); + .expect("output driver task should not have panicked"); + + info!("Collected runner metrics: {:?}", runner_metrics); + let final_runner_metrics = runner_metrics.lock().await; // Run the relevant data -- inputs, outputs, telemetry, etc -- through each validator to // get the validation results for this test. @@ -363,7 +377,7 @@ impl Runner { &input_events, &output_events, &telemetry_events, - &runner_metrics.lock().unwrap(), + &final_runner_metrics, ) }) .collect(); @@ -421,6 +435,7 @@ fn build_external_resource( configuration: &ValidationConfiguration, input_task_coordinator: &TaskCoordinator, output_task_coordinator: &TaskCoordinator, + runner_metrics: &Arc>, ) -> Result<(RunnerInput, RunnerOutput, Option>), vector_lib::Error> { let component_type = configuration.component_type(); let maybe_external_resource = configuration.external_resource(); @@ -436,7 +451,7 @@ fn build_external_resource( let (tx, rx) = mpsc::channel(1024); let resource = maybe_external_resource.expect("a source must always have an external resource"); - resource.spawn_as_input(rx, input_task_coordinator); + resource.spawn_as_input(rx, input_task_coordinator, runner_metrics); Ok(( RunnerInput::External(tx), @@ -456,7 +471,7 @@ fn build_external_resource( let (tx, rx) = mpsc::channel(1024); let resource = maybe_external_resource.expect("a sink must always have an external resource"); - resource.spawn_as_output(tx, output_task_coordinator)?; + resource.spawn_as_output(tx, output_task_coordinator, runner_metrics)?; Ok(( RunnerInput::Controlled, @@ -488,22 +503,22 @@ fn spawn_component_topology( .expect("should not fail to build current-thread runtime"); test_runtime.block_on(async move { - debug!("Building component topology..."); + info!("Building component topology..."); let (topology, mut crash_rx) = RunningTopology::start_init_validated(config, extra_context) .await .unwrap(); - debug!("Component topology built and spawned."); + info!("Component topology built and spawned."); topology_started.mark_as_done(); select! { // We got the signal to shutdown, so stop the topology gracefully. _ = topology_shutdown_handle.wait() => { - debug!("Shutdown signal received, stopping topology..."); + info!("Shutdown signal received, stopping topology..."); topology.stop().await; - debug!("Component topology stopped gracefully.") + info!("Component topology stopped gracefully.") }, _ = crash_rx.recv() => { error!("Component topology under validation unexpectedly crashed."); @@ -520,11 +535,15 @@ fn spawn_input_driver( input_tx: Sender, runner_metrics: &Arc>, mut maybe_encoder: Option>, + is_source: bool, ) -> JoinHandle<()> { let input_runner_metrics = Arc::clone(runner_metrics); + let log_namespace = LogNamespace::Legacy; + let now = Utc::now(); + tokio::spawn(async move { - for input_event in input_events { + for mut input_event in input_events { input_tx .send(input_event.clone()) .await @@ -532,30 +551,39 @@ fn spawn_input_driver( // Update the runner metrics for the sent event. This will later // be used in the Validators, as the "expected" case. - let mut input_runner_metrics = input_runner_metrics.lock().unwrap(); + let mut input_runner_metrics = input_runner_metrics.lock().await; + + // the controlled edge (vector source) adds metadata to the event when it is received. + // thus we need to add it here so the expected values for the comparisons on transforms + // and sinks are accurate. + if !is_source { + if let Event::Log(ref mut log) = input_event.get_event() { + log_namespace.insert_standard_vector_source_metadata(log, "vector", now); + } + } + + let (modified, event) = input_event.clone().get(); if let Some(encoder) = maybe_encoder.as_mut() { let mut buffer = BytesMut::new(); - encode_test_event(encoder, &mut buffer, input_event.clone()); + encode_test_event(encoder, &mut buffer, input_event); input_runner_metrics.sent_bytes_total += buffer.len() as u64; } - let (modified, event) = match input_event { - TestEvent::Passthrough(event) => (false, event), - TestEvent::Modified { modified, event } => (modified, event), - }; - // account for failure case if modified { input_runner_metrics.errors_total += 1; } else { input_runner_metrics.sent_events_total += 1; + // The event is wrapped in a Vec to match the actual event storage in + // the real topology input_runner_metrics.sent_event_bytes_total += vec![event].estimated_json_encoded_size_of().get() as u64; } } + trace!("Input driver sent all events."); }) } @@ -563,6 +591,7 @@ fn spawn_output_driver( mut output_rx: Receiver>, runner_metrics: &Arc>, maybe_encoder: Option>, + is_sink: bool, ) -> JoinHandle> { let output_runner_metrics = Arc::clone(runner_metrics); @@ -573,23 +602,25 @@ fn spawn_output_driver( // Update the runner metrics for the received event. This will later // be used in the Validators, as the "expected" case. - let mut output_runner_metrics = output_runner_metrics.lock().unwrap(); + let mut output_runner_metrics = output_runner_metrics.lock().await; for output_event in events { - output_runner_metrics.received_events_total += 1; - output_runner_metrics.received_event_bytes_total += vec![output_event.clone()] - .estimated_json_encoded_size_of() - .get() - as u64; - - if let Some(encoder) = maybe_encoder.as_ref() { - let mut buffer = BytesMut::new(); - encoder - .clone() - .encode(output_event, &mut buffer) - .expect("should not fail to encode output event"); - - output_runner_metrics.received_bytes_total += buffer.len() as u64; + if !is_sink { + // The event is wrapped in a Vec to match the actual event storage in + // the real topology + output_runner_metrics.received_event_bytes_total += + vec![&output_event].estimated_json_encoded_size_of().get() as u64; + + if let Some(encoder) = maybe_encoder.as_ref() { + let mut buffer = BytesMut::new(); + encoder + .clone() + .encode(output_event, &mut buffer) + .expect("should not fail to encode output event"); + + output_runner_metrics.received_events_total += 1; + output_runner_metrics.received_bytes_total += buffer.len() as u64; + } } } } diff --git a/src/components/validation/validators/component_spec/mod.rs b/src/components/validation/validators/component_spec/mod.rs index cfd1f8a5a88f5..799f27a394de3 100644 --- a/src/components/validation/validators/component_spec/mod.rs +++ b/src/components/validation/validators/component_spec/mod.rs @@ -1,12 +1,9 @@ -mod sources; +use crate::components::validation::{ + component_names::*, ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent, +}; +use vector_lib::event::{Event, Metric, MetricKind}; -use vector_lib::event::{Event, Metric}; - -use crate::components::validation::{ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent}; - -use super::Validator; - -use self::sources::{validate_sources, SourceMetricType}; +use super::{ComponentMetricType, Validator}; /// Validates that the component meets the requirements of the [Component Specification][component_spec]. /// @@ -34,11 +31,11 @@ impl Validator for ComponentSpecValidator { runner_metrics: &RunnerMetrics, ) -> Result, Vec> { for input in inputs { - debug!("Validator observed input event: {:?}", input); + info!("Validator observed input event: {:?}", input); } for output in outputs { - debug!("Validator observed output event: {:?}", output); + info!("Validator observed output event: {:?}", output); } // Validate that the number of inputs/outputs matched the test case expectation. @@ -97,17 +94,28 @@ fn validate_telemetry( let mut out: Vec = Vec::new(); let mut errs: Vec = Vec::new(); - match component_type { - ComponentType::Source => { - let result = validate_sources(telemetry_events, runner_metrics); - match result { - Ok(o) => out.extend(o), - Err(e) => errs.extend(e), - } + let metric_types = [ + ComponentMetricType::EventsReceived, + ComponentMetricType::EventsReceivedBytes, + ComponentMetricType::ReceivedBytesTotal, + ComponentMetricType::SentEventsTotal, + ComponentMetricType::SentEventBytesTotal, + ComponentMetricType::SentBytesTotal, + ComponentMetricType::ErrorsTotal, + ComponentMetricType::DiscardedEventsTotal, + ]; + + metric_types.iter().for_each(|metric_type| { + match validate_metric( + telemetry_events, + runner_metrics, + metric_type, + component_type, + ) { + Err(e) => errs.extend(e), + Ok(m) => out.extend(m), } - ComponentType::Sink => {} - ComponentType::Transform => {} - } + }); if errs.is_empty() { Ok(out) @@ -116,11 +124,75 @@ fn validate_telemetry( } } +fn validate_metric( + telemetry_events: &[Event], + runner_metrics: &RunnerMetrics, + metric_type: &ComponentMetricType, + component_type: ComponentType, +) -> Result, Vec> { + let component_id = match component_type { + ComponentType::Source => TEST_SOURCE_NAME, + ComponentType::Transform => TEST_TRANSFORM_NAME, + ComponentType::Sink => TEST_SINK_NAME, + }; + + let expected = match metric_type { + ComponentMetricType::EventsReceived => { + // The reciprocal metric for events received is events sent, + // so the expected value is what the input runner sent. + runner_metrics.sent_events_total + } + ComponentMetricType::EventsReceivedBytes => { + // The reciprocal metric for received_event_bytes is sent_event_bytes, + // so the expected value is what the input runner sent. + runner_metrics.sent_event_bytes_total + } + ComponentMetricType::ReceivedBytesTotal => { + // The reciprocal metric for received_bytes is sent_bytes, + // so the expected value is what the input runner sent. + if component_type == ComponentType::Sink { + 0 // sinks should not emit this metric + } else { + runner_metrics.sent_bytes_total + } + } + ComponentMetricType::SentEventsTotal => { + // The reciprocal metric for events sent is events received, + // so the expected value is what the output runner received. + runner_metrics.received_events_total + } + ComponentMetricType::SentBytesTotal => { + // The reciprocal metric for sent_bytes is received_bytes, + // so the expected value is what the output runner received. + if component_type == ComponentType::Source { + 0 // sources should not emit this metric + } else { + runner_metrics.received_bytes_total + } + } + ComponentMetricType::SentEventBytesTotal => { + // The reciprocal metric for sent_event_bytes is received_event_bytes, + // so the expected value is what the output runner received. + runner_metrics.received_event_bytes_total + } + ComponentMetricType::ErrorsTotal => runner_metrics.errors_total, + ComponentMetricType::DiscardedEventsTotal => runner_metrics.discarded_events_total, + }; + + compare_actual_to_expected(telemetry_events, metric_type, component_id, expected) +} + fn filter_events_by_metric_and_component<'a>( telemetry_events: &'a [Event], - metric: &SourceMetricType, - component_name: &'a str, + metric: &ComponentMetricType, + component_id: &'a str, ) -> Vec<&'a Metric> { + info!( + "Filter looking for metric {} {}", + metric.to_string(), + component_id + ); + let metrics: Vec<&Metric> = telemetry_events .iter() .flat_map(|e| { @@ -133,7 +205,7 @@ fn filter_events_by_metric_and_component<'a>( .filter(|&m| { if m.name() == metric.to_string() { if let Some(tags) = m.tags() { - if tags.get("component_name").unwrap_or("") == component_name { + if tags.get("component_id").unwrap_or("") == component_id { return true; } } @@ -143,7 +215,63 @@ fn filter_events_by_metric_and_component<'a>( }) .collect(); - debug!("{}: {} metrics found.", metric.to_string(), metrics.len(),); + info!("{}: {} metrics found.", metric.to_string(), metrics.len()); metrics } + +fn sum_counters( + metric_name: &ComponentMetricType, + metrics: &[&Metric], +) -> Result> { + let mut sum: f64 = 0.0; + let mut errs = Vec::new(); + + for m in metrics { + match m.value() { + vector_lib::event::MetricValue::Counter { value } => { + if let MetricKind::Absolute = m.data().kind { + sum = *value; + } else { + sum += *value; + } + } + _ => errs.push(format!("{}: metric value is not a counter", metric_name,)), + } + } + + if errs.is_empty() { + Ok(sum as u64) + } else { + Err(errs) + } +} + +fn compare_actual_to_expected( + telemetry_events: &[Event], + metric_type: &ComponentMetricType, + component_id: &str, + expected: u64, +) -> Result, Vec> { + let mut errs: Vec = Vec::new(); + + let metrics = + filter_events_by_metric_and_component(telemetry_events, metric_type, component_id); + + let actual = sum_counters(metric_type, &metrics)?; + + info!("{}: expected {}, actual {}.", metric_type, expected, actual,); + + if actual != expected { + errs.push(format!( + "{}: expected {}, but received {}", + metric_type, expected, actual + )); + } + + if !errs.is_empty() { + return Err(errs); + } + + Ok(vec![format!("{}: {}", metric_type, actual)]) +} diff --git a/src/components/validation/validators/component_spec/sources.rs b/src/components/validation/validators/component_spec/sources.rs deleted file mode 100644 index e4c6d3383f688..0000000000000 --- a/src/components/validation/validators/component_spec/sources.rs +++ /dev/null @@ -1,242 +0,0 @@ -use std::fmt::{Display, Formatter}; - -use vector_lib::event::{Event, MetricKind}; - -use crate::components::validation::RunnerMetrics; - -use super::filter_events_by_metric_and_component; - -const TEST_SOURCE_NAME: &str = "test_source"; - -pub enum SourceMetricType { - EventsReceived, - EventsReceivedBytes, - ReceivedBytesTotal, - SentEventsTotal, - SentEventBytesTotal, - ErrorsTotal, -} - -impl SourceMetricType { - const fn name(&self) -> &'static str { - match self { - SourceMetricType::EventsReceived => "component_received_events_total", - SourceMetricType::EventsReceivedBytes => "component_received_event_bytes_total", - SourceMetricType::ReceivedBytesTotal => "component_received_bytes_total", - SourceMetricType::SentEventsTotal => "component_sent_events_total", - SourceMetricType::SentEventBytesTotal => "component_sent_event_bytes_total", - SourceMetricType::ErrorsTotal => "component_errors_total", - } - } -} - -impl Display for SourceMetricType { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.name()) - } -} - -pub fn validate_sources( - telemetry_events: &[Event], - runner_metrics: &RunnerMetrics, -) -> Result, Vec> { - let mut out: Vec = Vec::new(); - let mut errs: Vec = Vec::new(); - - let validations = [ - validate_component_received_events_total, - validate_component_received_event_bytes_total, - validate_component_received_bytes_total, - validate_component_sent_events_total, - validate_component_sent_event_bytes_total, - validate_component_errors_total, - ]; - - for v in validations.iter() { - match v(telemetry_events, runner_metrics) { - Err(e) => errs.extend(e), - Ok(m) => out.extend(m), - } - } - - if errs.is_empty() { - Ok(out) - } else { - Err(errs) - } -} - -fn sum_counters( - metric_name: &SourceMetricType, - metrics: &[&vector_lib::event::Metric], -) -> Result> { - let mut sum: f64 = 0.0; - let mut errs = Vec::new(); - - for m in metrics { - match m.value() { - vector_lib::event::MetricValue::Counter { value } => { - if let MetricKind::Absolute = m.data().kind { - sum = *value; - } else { - sum += *value; - } - } - _ => errs.push(format!("{}: metric value is not a counter", metric_name,)), - } - } - - if errs.is_empty() { - Ok(sum as u64) - } else { - Err(errs) - } -} - -fn validate_events_total( - telemetry_events: &[Event], - metric_type: &SourceMetricType, - expected_events: u64, -) -> Result, Vec> { - let mut errs: Vec = Vec::new(); - - let metrics = - filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); - - let actual_events = sum_counters(metric_type, &metrics)?; - - debug!( - "{}: {} events, {} expected events.", - metric_type, actual_events, expected_events, - ); - - if actual_events != expected_events { - errs.push(format!( - "{}: expected {} events, but received {}", - metric_type, expected_events, actual_events - )); - } - - if !errs.is_empty() { - return Err(errs); - } - - Ok(vec![format!("{}: {}", metric_type, actual_events)]) -} - -fn validate_bytes_total( - telemetry_events: &[Event], - metric_type: &SourceMetricType, - expected_bytes: u64, -) -> Result, Vec> { - let mut errs: Vec = Vec::new(); - - let metrics = - filter_events_by_metric_and_component(telemetry_events, metric_type, TEST_SOURCE_NAME); - - let actual_bytes = sum_counters(metric_type, &metrics)?; - - debug!( - "{}: {} bytes, {} expected bytes.", - metric_type, actual_bytes, expected_bytes, - ); - - if actual_bytes != expected_bytes { - errs.push(format!( - "{}: expected {} bytes, but received {}", - metric_type, expected_bytes, actual_bytes - )); - } - - if !errs.is_empty() { - return Err(errs); - } - - Ok(vec![format!("{}: {}", metric_type, actual_bytes)]) -} - -fn validate_component_received_events_total( - telemetry_events: &[Event], - runner_metrics: &RunnerMetrics, -) -> Result, Vec> { - // The reciprocal metric for events received is events sent, - // so the expected value is what the input runner sent. - let expected_events = runner_metrics.sent_events_total; - - validate_events_total( - telemetry_events, - &SourceMetricType::EventsReceived, - expected_events, - ) -} - -fn validate_component_received_event_bytes_total( - telemetry_events: &[Event], - runner_metrics: &RunnerMetrics, -) -> Result, Vec> { - // The reciprocal metric for received_event_bytes is sent_event_bytes, - // so the expected value is what the input runner sent. - let expected_bytes = runner_metrics.sent_event_bytes_total; - - validate_bytes_total( - telemetry_events, - &SourceMetricType::EventsReceivedBytes, - expected_bytes, - ) -} - -fn validate_component_received_bytes_total( - telemetry_events: &[Event], - runner_metrics: &RunnerMetrics, -) -> Result, Vec> { - // The reciprocal metric for received_bytes is sent_bytes, - // so the expected value is what the input runner sent. - let expected_bytes = runner_metrics.sent_bytes_total; - - validate_bytes_total( - telemetry_events, - &SourceMetricType::ReceivedBytesTotal, - expected_bytes, - ) -} - -fn validate_component_sent_events_total( - telemetry_events: &[Event], - runner_metrics: &RunnerMetrics, -) -> Result, Vec> { - // The reciprocal metric for events sent is events received, - // so the expected value is what the output runner received. - let expected_events = runner_metrics.received_events_total; - - validate_events_total( - telemetry_events, - &SourceMetricType::SentEventsTotal, - expected_events, - ) -} - -fn validate_component_sent_event_bytes_total( - telemetry_events: &[Event], - runner_metrics: &RunnerMetrics, -) -> Result, Vec> { - // The reciprocal metric for sent_event_bytes is received_event_bytes, - // so the expected value is what the output runner received. - let expected_bytes = runner_metrics.received_event_bytes_total; - - validate_bytes_total( - telemetry_events, - &SourceMetricType::SentEventBytesTotal, - expected_bytes, - ) -} - -fn validate_component_errors_total( - telemetry_events: &[Event], - runner_metrics: &RunnerMetrics, -) -> Result, Vec> { - validate_events_total( - telemetry_events, - &SourceMetricType::ErrorsTotal, - runner_metrics.errors_total, - ) -} diff --git a/src/components/validation/validators/mod.rs b/src/components/validation/validators/mod.rs index b89441d8adc9a..7941a177a527e 100644 --- a/src/components/validation/validators/mod.rs +++ b/src/components/validation/validators/mod.rs @@ -2,6 +2,8 @@ mod component_spec; pub use self::component_spec::ComponentSpecValidator; +use std::fmt::{Display, Formatter}; + use vector_lib::event::Event; use super::{ComponentType, RunnerMetrics, TestCaseExpectation, TestEvent}; @@ -49,3 +51,35 @@ impl From for Box { } } } + +pub enum ComponentMetricType { + EventsReceived, + EventsReceivedBytes, + ReceivedBytesTotal, + SentEventsTotal, + SentBytesTotal, + SentEventBytesTotal, + ErrorsTotal, + DiscardedEventsTotal, +} + +impl ComponentMetricType { + const fn name(&self) -> &'static str { + match self { + ComponentMetricType::EventsReceived => "component_received_events_total", + ComponentMetricType::EventsReceivedBytes => "component_received_event_bytes_total", + ComponentMetricType::ReceivedBytesTotal => "component_received_bytes_total", + ComponentMetricType::SentEventsTotal => "component_sent_events_total", + ComponentMetricType::SentBytesTotal => "component_sent_bytes_total", + ComponentMetricType::SentEventBytesTotal => "component_sent_event_bytes_total", + ComponentMetricType::ErrorsTotal => "component_errors_total", + ComponentMetricType::DiscardedEventsTotal => "component_discarded_events_total", + } + } +} + +impl Display for ComponentMetricType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } +}