diff --git a/src/components/validation/mod.rs b/src/components/validation/mod.rs index f1ef68609e59c..d195b48ea99b0 100644 --- a/src/components/validation/mod.rs +++ b/src/components/validation/mod.rs @@ -57,6 +57,51 @@ pub enum ComponentConfiguration { Sink(BoxedSink), } +/// Component configuration for a test case. +#[derive(Clone)] +pub struct ComponentTestCaseConfig { + config: ComponentConfiguration, + /// If specified, this name must match the `config_name` field of at least one of the test case events. + test_case: Option, + external_resource: Option, +} + +impl ComponentTestCaseConfig { + pub fn from_source>( + config: C, + test_case: Option, + external_resource: Option, + ) -> Self { + Self { + config: ComponentConfiguration::Source(config.into()), + test_case, + external_resource, + } + } + pub fn from_transform>( + config: C, + test_case: Option, + external_resource: Option, + ) -> Self { + Self { + config: ComponentConfiguration::Transform(config.into()), + test_case, + external_resource, + } + } + pub fn from_sink>( + config: C, + test_case: Option, + external_resource: Option, + ) -> Self { + Self { + config: ComponentConfiguration::Sink(config.into()), + test_case, + external_resource, + } + } +} + /// Configuration for validating a component. /// /// This type encompasses all of the required information for configuring and validating a @@ -66,46 +111,45 @@ pub enum ComponentConfiguration { pub struct ValidationConfiguration { component_name: &'static str, component_type: ComponentType, - component_configuration: ComponentConfiguration, - external_resource: Option, + /// There may be only one `ComponentTestCaseConfig` necessary to execute all test cases, but some cases + /// require more advanced configuration in order to hit the code path desired. + component_configurations: Vec, } impl ValidationConfiguration { /// Creates a new `ValidationConfiguration` for a source. - pub fn from_source>( + pub fn from_source( component_name: &'static str, - config: C, - external_resource: Option, + component_configurations: Vec, ) -> Self { Self { component_name, component_type: ComponentType::Source, - component_configuration: ComponentConfiguration::Source(config.into()), - external_resource, + component_configurations, } } /// Creates a new `ValidationConfiguration` for a transform. - pub fn from_transform(component_name: &'static str, config: impl Into) -> Self { + pub fn from_transform( + component_name: &'static str, + component_configurations: Vec, + ) -> Self { Self { component_name, component_type: ComponentType::Transform, - component_configuration: ComponentConfiguration::Transform(config.into()), - external_resource: None, + component_configurations, } } /// Creates a new `ValidationConfiguration` for a sink. - pub fn from_sink>( + pub fn from_sink( component_name: &'static str, - config: C, - external_resource: Option, + component_configurations: Vec, ) -> Self { Self { component_name, component_type: ComponentType::Sink, - component_configuration: ComponentConfiguration::Sink(config.into()), - external_resource, + component_configurations, } } @@ -120,13 +164,31 @@ impl ValidationConfiguration { } /// Gets the configuration of the component. - pub fn component_configuration(&self) -> ComponentConfiguration { - self.component_configuration.clone() + pub fn component_configurations(&self) -> Vec { + self.component_configurations.clone() + } + + fn get_comp_test_case(&self, test_case: Option<&String>) -> Option { + let empty = String::from(""); + let test_case = test_case.unwrap_or(&empty); + self.component_configurations + .clone() + .into_iter() + .find(|c| c.test_case.as_ref().unwrap_or(&String::from("")) == test_case) + } + + /// Gets the configuration of the component. + pub fn component_configuration_for_test_case( + &self, + test_case: Option<&String>, + ) -> Option { + self.get_comp_test_case(test_case).map(|c| c.config) } /// Gets the external resource definition for validating the component, if any. - pub fn external_resource(&self) -> Option { - self.external_resource.clone() + pub fn external_resource(&self, test_case: Option<&String>) -> Option { + self.get_comp_test_case(test_case) + .and_then(|c| c.external_resource) } } diff --git a/src/components/validation/resources/event.rs b/src/components/validation/resources/event.rs index 96870f6b773af..343466ee9e51c 100644 --- a/src/components/validation/resources/event.rs +++ b/src/components/validation/resources/event.rs @@ -1,5 +1,6 @@ use bytes::BytesMut; use serde::Deserialize; +use serde_json::Value; use snafu::Snafu; use tokio_util::codec::Encoder as _; @@ -27,7 +28,19 @@ pub enum RawTestEvent { /// /// For transforms and sinks, generally, the only way to cause an error is if the event itself /// is malformed in some way, which can be achieved without this test event variant. - Modified { modified: bool, event: EventData }, + AlternateEncoder { fail_encoding_of: EventData }, + + /// The event is created, and the specified field is added to it. + /// + /// This allows the ability to hit code paths where some codecs require specific fields to be of specific + /// types, thus allowing us to encode into the input runner without error, but encoding in the component + /// under test can be set up to fail. + WithField { + event: EventData, + name: String, + value: Value, + fail: Option, + }, } #[derive(Clone, Debug, Deserialize)] @@ -52,6 +65,9 @@ impl EventData { /// metrics collection is based on the same event. Namely, one issue that can arise from creating the event /// from the event data twice (once for the expected and once for actual), it can result in a timestamp in /// the event which may or may not have the same millisecond precision as it's counterpart. +/// +/// For transforms and sinks, generally, the only way to cause an error is if the event itself +/// is malformed in some way, which can be achieved without this test event variant. #[derive(Clone, Debug, Deserialize)] #[serde(from = "RawTestEvent")] #[serde(untagged)] @@ -59,16 +75,16 @@ pub enum TestEvent { /// The event is used, as-is, without modification. Passthrough(Event), - /// The event is potentially modified by the external resource. - /// - /// The modification made is dependent on the external resource, but this mode is made available - /// for when a test case wants to exercise the failure path, but cannot cause a failure simply - /// by constructing the event in a certain way i.e. adding an invalid field, or removing a - /// required field, or using an invalid field value, and so on. + /// The event is encoded using an encoding that differs from the component's + /// configured encoding, which should cause an error when the event is decoded. + FailWithAlternateEncoder(Event), + + /// The event has an additional field injected prior to encoding, which should cause + /// an error when the event is decoded. /// - /// For transforms and sinks, generally, the only way to cause an error is if the event itself - /// is malformed in some way, which can be achieved without this test event variant. - Modified { modified: bool, event: Event }, + /// This is useful for testing encodings that have strict schemas and cannot + /// handle arbitrary fields or differing data types for certain fields. + FailWithInjectedField(Event), } impl TestEvent { @@ -76,21 +92,25 @@ impl TestEvent { pub fn into_event(self) -> Event { match self { Self::Passthrough(event) => event, - Self::Modified { event, .. } => event, + Self::FailWithAlternateEncoder(event) => event, + Self::FailWithInjectedField(event) => event, } } pub fn get_event(&mut self) -> &mut Event { match self { Self::Passthrough(event) => event, - Self::Modified { event, .. } => event, + Self::FailWithAlternateEncoder(event) => event, + Self::FailWithInjectedField(event) => event, } } + /// (should_fail, event) pub fn get(self) -> (bool, Event) { match self { Self::Passthrough(event) => (false, event), - Self::Modified { modified, event } => (modified, event), + Self::FailWithAlternateEncoder(event) => (true, event), + Self::FailWithInjectedField(event) => (true, event), } } } @@ -104,10 +124,25 @@ impl From for TestEvent { RawTestEvent::Passthrough(event_data) => { TestEvent::Passthrough(event_data.into_event()) } - RawTestEvent::Modified { modified, event } => TestEvent::Modified { - modified, - event: event.into_event(), - }, + RawTestEvent::AlternateEncoder { + fail_encoding_of: event_data, + } => TestEvent::FailWithAlternateEncoder(event_data.into_event()), + RawTestEvent::WithField { + event, + name, + value, + fail, + } => { + let mut event = event.into_event(); + let log_event = event.as_mut_log(); + log_event.insert(name.as_str(), value); + + if fail.unwrap_or_default() { + TestEvent::FailWithInjectedField(event) + } else { + TestEvent::Passthrough(event) + } + } } } } @@ -118,13 +153,13 @@ pub fn encode_test_event( event: TestEvent, ) { match event { - TestEvent::Passthrough(event) => { + TestEvent::Passthrough(event) | TestEvent::FailWithInjectedField(event) => { // Encode the event normally. encoder .encode(event, buf) .expect("should not fail to encode input event"); } - TestEvent::Modified { event, .. } => { + TestEvent::FailWithAlternateEncoder(event) => { // This is a little fragile, but we check what serializer this encoder uses, and based // on `Serializer::supports_json`, we choose an opposing codec. For example, if the // encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise diff --git a/src/components/validation/runner/config.rs b/src/components/validation/runner/config.rs index ed721fea929f0..f2adb90765ce1 100644 --- a/src/components/validation/runner/config.rs +++ b/src/components/validation/runner/config.rs @@ -24,9 +24,17 @@ pub struct TopologyBuilder { impl TopologyBuilder { /// Creates a component topology for the given component configuration. - pub fn from_configuration(configuration: &ValidationConfiguration) -> Self { - let component_configuration = configuration.component_configuration(); - match component_configuration { + pub fn from_configuration( + configuration: &ValidationConfiguration, + config_name: Option<&String>, + ) -> Result { + let component_configuration = configuration + .component_configuration_for_test_case(config_name) + .ok_or(format!( + "No test case name defined for configuration {:?}.", + config_name + ))?; + Ok(match component_configuration { ComponentConfiguration::Source(source) => { debug_assert_eq!(configuration.component_type(), ComponentType::Source); Self::from_source(source) @@ -39,7 +47,7 @@ impl TopologyBuilder { debug_assert_eq!(configuration.component_type(), ComponentType::Sink); Self::from_sink(sink) } - } + }) } /// Creates a component topology for validating a source. diff --git a/src/components/validation/runner/mod.rs b/src/components/validation/runner/mod.rs index 5014edc5aaa40..91048a1e4438e 100644 --- a/src/components/validation/runner/mod.rs +++ b/src/components/validation/runner/mod.rs @@ -225,7 +225,10 @@ impl Runner { // We then finalize the topology builder to get our actual `ConfigBuilder`, as well as // any controlled edges (channel sender/receiver to the aforementioned filler // components) and a telemetry client for collecting internal telemetry. - let topology_builder = TopologyBuilder::from_configuration(&self.configuration); + let topology_builder = TopologyBuilder::from_configuration( + &self.configuration, + test_case.config_name.as_ref(), + )?; let (config_builder, controlled_edges, telemetry_collector) = topology_builder .finalize( &input_task_coordinator, @@ -252,6 +255,7 @@ impl Runner { // controlled output edge, which means we then need a server task listening for the // events sent by that sink. let (runner_input, runner_output, maybe_runner_encoder) = build_external_resource( + test_case.config_name.as_ref(), &self.configuration, &input_task_coordinator, &output_task_coordinator, @@ -308,14 +312,14 @@ impl Runner { input_tx, &runner_metrics, maybe_runner_encoder.as_ref().cloned(), - self.configuration.component_type == ComponentType::Source, + self.configuration.component_type, ); let output_driver = spawn_output_driver( output_rx, &runner_metrics, maybe_runner_encoder.as_ref().cloned(), - self.configuration.component_type == ComponentType::Sink, + self.configuration.component_type, ); // At this point, the component topology is running, and all input/output/telemetry @@ -364,6 +368,7 @@ impl Runner { name: test_name, expectation, events: input_events, + .. } = test_case; let telemetry_events = telemetry_collector.collect().await; @@ -432,16 +437,21 @@ fn load_component_test_cases(test_case_data_path: PathBuf) -> Result, configuration: &ValidationConfiguration, input_task_coordinator: &TaskCoordinator, output_task_coordinator: &TaskCoordinator, runner_metrics: &Arc>, ) -> Result<(RunnerInput, RunnerOutput, Option>), vector_lib::Error> { let component_type = configuration.component_type(); - let maybe_external_resource = configuration.external_resource(); - let maybe_encoder = maybe_external_resource + let maybe_external_resource = configuration.external_resource(test_case); + + let resource_codec = maybe_external_resource .as_ref() - .map(|resource| resource.codec.into_encoder()); + .map(|resource| resource.codec.clone()); + + let maybe_encoder = resource_codec.as_ref().map(|codec| codec.into_encoder()); + match component_type { ComponentType::Source => { // As an external resource for a source, we create a channel that the validation runner @@ -535,7 +545,7 @@ fn spawn_input_driver( input_tx: Sender, runner_metrics: &Arc>, mut maybe_encoder: Option>, - is_source: bool, + component_type: ComponentType, ) -> JoinHandle<()> { let input_runner_metrics = Arc::clone(runner_metrics); @@ -556,13 +566,13 @@ fn spawn_input_driver( // the controlled edge (vector source) adds metadata to the event when it is received. // thus we need to add it here so the expected values for the comparisons on transforms // and sinks are accurate. - if !is_source { + if component_type != ComponentType::Source { if let Event::Log(ref mut log) = input_event.get_event() { log_namespace.insert_standard_vector_source_metadata(log, "vector", now); } } - let (modified, event) = input_event.clone().get(); + let (failure_case, event) = input_event.clone().get(); if let Some(encoder) = maybe_encoder.as_mut() { let mut buffer = BytesMut::new(); @@ -572,9 +582,15 @@ fn spawn_input_driver( } // account for failure case - if modified { + if failure_case { input_runner_metrics.errors_total += 1; - } else { + // TODO: this assumption may need to be made configurable at some point + if component_type == ComponentType::Sink { + input_runner_metrics.discarded_events_total += 1; + } + } + + if !failure_case || component_type == ComponentType::Sink { input_runner_metrics.sent_events_total += 1; // The event is wrapped in a Vec to match the actual event storage in @@ -591,7 +607,7 @@ fn spawn_output_driver( mut output_rx: Receiver>, runner_metrics: &Arc>, maybe_encoder: Option>, - is_sink: bool, + component_type: ComponentType, ) -> JoinHandle> { let output_runner_metrics = Arc::clone(runner_metrics); @@ -605,7 +621,7 @@ fn spawn_output_driver( let mut output_runner_metrics = output_runner_metrics.lock().await; for output_event in events { - if !is_sink { + if component_type != ComponentType::Sink { // The event is wrapped in a Vec to match the actual event storage in // the real topology output_runner_metrics.received_event_bytes_total += diff --git a/src/components/validation/test_case.rs b/src/components/validation/test_case.rs index 360c6513161b0..a281f6e5883f0 100644 --- a/src/components/validation/test_case.rs +++ b/src/components/validation/test_case.rs @@ -26,6 +26,7 @@ pub enum TestCaseExpectation { #[derive(Deserialize)] pub struct TestCase { pub name: String, + pub config_name: Option, pub expectation: TestCaseExpectation, pub events: Vec, } diff --git a/src/components/validation/validators/component_spec/mod.rs b/src/components/validation/validators/component_spec/mod.rs index 799f27a394de3..3d055ae6993ce 100644 --- a/src/components/validation/validators/component_spec/mod.rs +++ b/src/components/validation/validators/component_spec/mod.rs @@ -204,6 +204,7 @@ fn filter_events_by_metric_and_component<'a>( }) .filter(|&m| { if m.name() == metric.to_string() { + info!("{}", m); if let Some(tags) = m.tags() { if tags.get("component_id").unwrap_or("") == component_id { return true; diff --git a/src/sinks/http/config.rs b/src/sinks/http/config.rs index cf6d7264fbe32..62ee534ef1e6f 100644 --- a/src/sinks/http/config.rs +++ b/src/sinks/http/config.rs @@ -5,11 +5,12 @@ use hyper::Body; use indexmap::IndexMap; use vector_lib::codecs::{ encoding::{Framer, Serializer}, - CharacterDelimitedEncoder, + CharacterDelimitedEncoder, GelfSerializerConfig, }; use crate::{ codecs::{EncodingConfigWithFraming, SinkType}, + components::validation::ComponentTestCaseConfig, http::{Auth, HttpClient, MaybeAuth}, sinks::{ prelude::*, @@ -311,33 +312,80 @@ impl ValidatableComponent for HttpSinkConfig { use std::str::FromStr; use vector_lib::codecs::{JsonSerializerConfig, MetricTagValues}; - let config = Self { - uri: UriSerde::from_str("http://127.0.0.1:9000/endpoint") - .expect("should never fail to parse"), - method: HttpMethod::Post, - encoding: EncodingConfigWithFraming::new( - None, - JsonSerializerConfig::new(MetricTagValues::Full).into(), - Transformer::default(), - ), - auth: None, - headers: None, - compression: Compression::default(), - batch: BatchConfig::default(), - request: RequestConfig::default(), - tls: None, - acknowledgements: AcknowledgementsConfig::default(), - payload_prefix: String::new(), - payload_suffix: String::new(), - }; + let happy_encoder = EncodingConfigWithFraming::new( + None, + JsonSerializerConfig::new(MetricTagValues::Full).into(), + Transformer::default(), + ); + + fn get_config(encoding: EncodingConfigWithFraming) -> HttpSinkConfig { + HttpSinkConfig { + uri: UriSerde::from_str("http://127.0.0.1:9000/endpoint") + .expect("should never fail to parse"), + method: HttpMethod::Post, + encoding, + auth: None, + headers: None, + compression: Compression::default(), + batch: BatchConfig::default(), + request: RequestConfig::default(), + tls: None, + acknowledgements: AcknowledgementsConfig::default(), + payload_prefix: String::new(), + payload_suffix: String::new(), + } + } - let external_resource = ExternalResource::new( - ResourceDirection::Push, - HttpResourceConfig::from_parts(config.uri.uri.clone(), Some(config.method.into())), - config.encoding.clone(), + fn get_external_resource( + config: &HttpSinkConfig, + encoding: Option, + ) -> ExternalResource { + ExternalResource::new( + ResourceDirection::Push, + HttpResourceConfig::from_parts(config.uri.uri.clone(), Some(config.method.into())), + if let Some(encoding) = encoding { + encoding + } else { + config.encoding.clone() + }, + ) + } + + let happy_config = get_config(happy_encoder.clone()); + + let happy_external_resource = get_external_resource(&happy_config, None); + + // this config uses the Gelf serializer, which requires the "level" field to + // be an integer + let sad_config = get_config(EncodingConfigWithFraming::new( + None, + GelfSerializerConfig::new().into(), + Transformer::default(), + )); + + let sad_external_resource = get_external_resource( + &happy_config, + // the external resource needs to use an encoder that actually works, in order to + // get the event into the topology successfully + Some(happy_encoder), ); - ValidationConfiguration::from_sink(Self::NAME, config, Some(external_resource)) + ValidationConfiguration::from_sink( + Self::NAME, + vec![ + ComponentTestCaseConfig::from_sink( + happy_config, + None, + Some(happy_external_resource), + ), + // this config only runs with the test case "encoding_error" in the yaml file. + ComponentTestCaseConfig::from_sink( + sad_config, + Some("encoding_error".to_owned()), + Some(sad_external_resource), + ), + ], + ) } } diff --git a/src/sources/http_client/client.rs b/src/sources/http_client/client.rs index 68304e40ca6f6..3093847904101 100644 --- a/src/sources/http_client/client.rs +++ b/src/sources/http_client/client.rs @@ -253,7 +253,14 @@ impl ValidatableComponent for HttpClientConfig { config.get_decoding_config(None), ); - ValidationConfiguration::from_source(Self::NAME, config, Some(external_resource)) + ValidationConfiguration::from_source( + Self::NAME, + vec![ComponentTestCaseConfig::from_source( + config, + None, + Some(external_resource), + )], + ) } } diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index a752358ad968c..255c4231d7bfd 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -292,7 +292,14 @@ impl ValidatableComponent for SimpleHttpConfig { .expect("should not fail to get decoding config"), ); - ValidationConfiguration::from_source(Self::NAME, config, Some(external_resource)) + ValidationConfiguration::from_source( + Self::NAME, + vec![ComponentTestCaseConfig::from_source( + config, + None, + Some(external_resource), + )], + ) } } diff --git a/tests/validation/components/sinks/http.yaml b/tests/validation/components/sinks/http.yaml index 7f7c18db35ad4..3d3525766d74f 100644 --- a/tests/validation/components/sinks/http.yaml +++ b/tests/validation/components/sinks/http.yaml @@ -4,3 +4,11 @@ - simple message 1 - simple message 2 - simple message 3 +- name: sad path + config_name: encoding_error + expectation: failure + events: + - event: simple message with the invalid data type for encoder + name: level + value: "1" + fail: true diff --git a/tests/validation/components/sources/http_client.yaml b/tests/validation/components/sources/http_client.yaml index 08424a9cde089..437a7d680b566 100644 --- a/tests/validation/components/sources/http_client.yaml +++ b/tests/validation/components/sources/http_client.yaml @@ -9,5 +9,4 @@ events: - simple message 1 - simple message 2 - - modified: true - event: simple message with the wrong encoding + - fail_encoding_of: simple message with the wrong encoding diff --git a/tests/validation/components/sources/http_server.yaml b/tests/validation/components/sources/http_server.yaml index 08424a9cde089..437a7d680b566 100644 --- a/tests/validation/components/sources/http_server.yaml +++ b/tests/validation/components/sources/http_server.yaml @@ -9,5 +9,4 @@ events: - simple message 1 - simple message 2 - - modified: true - event: simple message with the wrong encoding + - fail_encoding_of: simple message with the wrong encoding