Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
09c562a
add fix and small refactor
neuronull Jul 11, 2023
1e0e6e7
fix compilation errors
neuronull Jul 11, 2023
63a9581
3 ticks
neuronull Jul 11, 2023
c6af43e
dont compute expected metrics in validator
neuronull Jul 12, 2023
55a3518
cleanup
neuronull Jul 12, 2023
8ec87b3
cleanup
neuronull Jul 12, 2023
4b3b721
clippy
neuronull Jul 12, 2023
f9854bf
feedback tz: sent_eventssssss
neuronull Jul 12, 2023
0577ee6
feedback tz: fix telemetry shutdown finishing logic
neuronull Jul 12, 2023
51e9ab4
3 ticks
neuronull Jul 13, 2023
99a2d20
Merge branch 'master' into neuronull/draft_component_validation_bette…
neuronull Jul 13, 2023
e8cdf11
small reorg to add sinks
neuronull Jul 13, 2023
c460a49
mini refactor of the component spec validators
neuronull Jul 14, 2023
3daced5
attempt to set expected values from the resource
neuronull Jul 14, 2023
b7a7bd3
feedback tz- from not try_from
neuronull Jul 19, 2023
af7e9b2
Merge branch 'neuronull/draft_component_validation_better_validation'…
neuronull Jul 19, 2023
0ce0e25
back to 3 ticks
neuronull Jul 19, 2023
35efd5a
fix incorrect expected values
neuronull Jul 19, 2023
1f4ea02
Even more reduction
neuronull Jul 19, 2023
e8b17af
clippy
neuronull Jul 19, 2023
cdeab8f
add the discarded events total check
neuronull Jul 19, 2023
a0f7a65
Merge branch 'master' into neuronull/draft_component_validation_bette…
neuronull Jul 19, 2023
0a6c056
Merge branch 'neuronull/draft_component_validation_better_validation'…
neuronull Jul 19, 2023
77c110b
Merge branch 'master' into neuronull/component_validation_sink_compon…
neuronull Jul 20, 2023
006db51
workaround the new sync issues
neuronull Jul 20, 2023
604abea
multi config support
neuronull Jul 21, 2023
82faf6c
cleanup
neuronull Jul 21, 2023
4745a2f
Merge branch 'master' into neuronull/component_validation_sink_compon…
neuronull Jan 23, 2024
b8650d0
Merge branch 'neuronull/component_validation_sink_component_spec' int…
neuronull Jan 23, 2024
1a43e8b
check events
neuronull Jan 23, 2024
f54bdac
Merge branch 'master' into neuronull/component_validation_sink_compon…
neuronull Jan 26, 2024
26bde95
Merge branch 'neuronull/component_validation_sink_component_spec' int…
neuronull Jan 26, 2024
f6aa019
partial feedback
neuronull Feb 2, 2024
a2689fe
thought i removed that
neuronull Feb 2, 2024
3ff66e0
use ref
neuronull Feb 5, 2024
73b8689
Merge branch 'neuronull/component_validation_sink_component_spec' int…
neuronull Feb 5, 2024
5d5e019
Merge branch 'master' into neuronull/component_validation_sink_sad_path
neuronull Feb 5, 2024
7d17599
feedback: dont introduce PassThroughFail variant
neuronull Feb 20, 2024
da6267c
feedback: adjust enum variant names for clarity
neuronull Feb 20, 2024
b5a2388
feedback: no idea what I was thinking with `input_codec`
neuronull Feb 20, 2024
94fcdbd
spell check
neuronull Feb 20, 2024
f560911
fr
neuronull Feb 20, 2024
0415759
feedback- update docs
neuronull Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 81 additions & 19 deletions src/components/validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,51 @@ pub enum ComponentConfiguration {
Sink(BoxedSink),
}

/// Component configuration for a test case.
#[derive(Clone)]
pub struct ComponentTestCaseConfig {
config: ComponentConfiguration,
/// If specified, this name must match the `config_name` field of at least one of the test case events.
test_case: Option<String>,
external_resource: Option<ExternalResource>,
}

impl ComponentTestCaseConfig {
pub fn from_source<C: Into<BoxedSource>>(
config: C,
test_case: Option<String>,
external_resource: Option<ExternalResource>,
) -> Self {
Self {
config: ComponentConfiguration::Source(config.into()),
test_case,
external_resource,
}
}
pub fn from_transform<C: Into<BoxedTransform>>(
config: C,
test_case: Option<String>,
external_resource: Option<ExternalResource>,
) -> Self {
Self {
config: ComponentConfiguration::Transform(config.into()),
test_case,
external_resource,
}
}
pub fn from_sink<C: Into<BoxedSink>>(
config: C,
test_case: Option<String>,
external_resource: Option<ExternalResource>,
) -> Self {
Self {
config: ComponentConfiguration::Sink(config.into()),
test_case,
external_resource,
}
}
}

/// Configuration for validating a component.
///
/// This type encompasses all of the required information for configuring and validating a
Expand All @@ -66,46 +111,45 @@ pub enum ComponentConfiguration {
pub struct ValidationConfiguration {
component_name: &'static str,
component_type: ComponentType,
component_configuration: ComponentConfiguration,
external_resource: Option<ExternalResource>,
/// There may be only one `ComponentTestCaseConfig` necessary to execute all test cases, but some cases
/// require more advanced configuration in order to hit the code path desired.
component_configurations: Vec<ComponentTestCaseConfig>,
}

impl ValidationConfiguration {
/// Creates a new `ValidationConfiguration` for a source.
pub fn from_source<C: Into<BoxedSource>>(
pub fn from_source(
component_name: &'static str,
config: C,
external_resource: Option<ExternalResource>,
component_configurations: Vec<ComponentTestCaseConfig>,
) -> Self {
Self {
component_name,
component_type: ComponentType::Source,
component_configuration: ComponentConfiguration::Source(config.into()),
external_resource,
component_configurations,
}
}

/// Creates a new `ValidationConfiguration` for a transform.
pub fn from_transform(component_name: &'static str, config: impl Into<BoxedTransform>) -> Self {
pub fn from_transform(
component_name: &'static str,
component_configurations: Vec<ComponentTestCaseConfig>,
) -> Self {
Self {
component_name,
component_type: ComponentType::Transform,
component_configuration: ComponentConfiguration::Transform(config.into()),
external_resource: None,
component_configurations,
}
}

/// Creates a new `ValidationConfiguration` for a sink.
pub fn from_sink<C: Into<BoxedSink>>(
pub fn from_sink(
component_name: &'static str,
config: C,
external_resource: Option<ExternalResource>,
component_configurations: Vec<ComponentTestCaseConfig>,
) -> Self {
Self {
component_name,
component_type: ComponentType::Sink,
component_configuration: ComponentConfiguration::Sink(config.into()),
external_resource,
component_configurations,
}
}

Expand All @@ -120,13 +164,31 @@ impl ValidationConfiguration {
}

/// Gets the configuration of the component.
pub fn component_configuration(&self) -> ComponentConfiguration {
self.component_configuration.clone()
pub fn component_configurations(&self) -> Vec<ComponentTestCaseConfig> {
self.component_configurations.clone()
}

fn get_comp_test_case(&self, test_case: Option<&String>) -> Option<ComponentTestCaseConfig> {
let empty = String::from("");
let test_case = test_case.unwrap_or(&empty);
self.component_configurations
.clone()
.into_iter()
.find(|c| c.test_case.as_ref().unwrap_or(&String::from("")) == test_case)
}

/// Gets the configuration of the component.
pub fn component_configuration_for_test_case(
&self,
test_case: Option<&String>,
) -> Option<ComponentConfiguration> {
self.get_comp_test_case(test_case).map(|c| c.config)
}

/// Gets the external resource definition for validating the component, if any.
pub fn external_resource(&self) -> Option<ExternalResource> {
self.external_resource.clone()
pub fn external_resource(&self, test_case: Option<&String>) -> Option<ExternalResource> {
self.get_comp_test_case(test_case)
.and_then(|c| c.external_resource)
}
}

Expand Down
73 changes: 54 additions & 19 deletions src/components/validation/resources/event.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use bytes::BytesMut;
use serde::Deserialize;
use serde_json::Value;
use snafu::Snafu;
use tokio_util::codec::Encoder as _;

Expand Down Expand Up @@ -27,7 +28,19 @@ pub enum RawTestEvent {
///
/// For transforms and sinks, generally, the only way to cause an error is if the event itself
/// is malformed in some way, which can be achieved without this test event variant.
Modified { modified: bool, event: EventData },
AlternateEncoder { fail_encoding_of: EventData },

/// The event is created, and the specified field is added to it.
///
/// This allows the ability to hit code paths where some codecs require specific fields to be of specific
/// types, thus allowing us to encode into the input runner without error, but encoding in the component
/// under test can be set up to fail.
WithField {
event: EventData,
name: String,
value: Value,
fail: Option<bool>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What scenarios do we have where we want to inject an additional field but don't want to have the event fail?

If that's common, then we probably want to add a variant to EventData that allows us to deserialize from a map, maybe something like...

#[serde(tag = "type", content = "data")]
enum EventData {
    #[serde(rename = "log")]
    Log(HashMap<String, Value>),
    
    #[serde(untagged)]
    RawLog(String),
}

I think this would be sufficient to allow us to define events like so:

# Raw log message, like we do now:
events:
- my simple log message

# More advanced:
events:
- type: log
  data:
    message: my simple log message
    level: "1"
    
# Bundled with something like an expected decoding failure:
events:
- fail_encoding_of:
    type: log
    data:
      message: my simple log message
      level: "1"

Again, I think this would work, and perhaps more importantly (at least in my mind, but this point is just a loosely-held opinion) it would be somewhat clearer because in order to construct a more advanced event (beyond just the raw message), we wouldn't be limited to just the raw message and a single field to inject. This would end up letting us actually write event data that, for example, where a log event doesn't even have a message field, and so on. A future improvement could then be to make the failure inducing modes be their own enum type -- a variant for "should fail on its own", a variant for "should fail if we mess up the encoding", and so on -- and then that failure type could just be a dedicated field such that the event data definitions might end up looking like:

events:
- failure_mode: invalid_encoding
  type: log
  data:
    message: my simple log message
- failure_mode: invalid_event
  type: log
  data:
    message: my simple log message
    level: "1"

It was always sort of my plan to make EventData work this way, with the single string variant as an escape hatch for defining basic log events without any additional boilerplate while more advanced variants had dedicated variants, especially since we might eventually want to test metrics this way, and we'd need an answer for that.

All of that said, I realize we're focusing primarily on log-specific components right now and so this is purely a suggestion on how we might be able to make this a little more generalized out of the gate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What scenarios do we have where we want to inject an additional field but don't want to have the event fail?

Yeah that is a good point. I think when I made that optional I was thinking about making it flexible so we could hit specific code paths in the components. We don't really have a need for this right now plus as you pointed out it may be more useful for metrics cases.

The changes you suggest, make sense to me.

One thing to note (and I was planning to tell you this today 😅 ) , In my branch to fix the synchronization issues, I actually removed this code path for injecting specific fields. In that branch I ended up changing the error validation for sinks to not rely on the codecs for the errors, and instead generate it from the external resource. I felt this was more of a realistic scenario.

In any case, for all the reasons you mentioned and the ones I added, I'll leave this comment open and we can refer back to it if/when we need to add this functionality.

Thanks!

},
}

#[derive(Clone, Debug, Deserialize)]
Expand All @@ -52,45 +65,52 @@ impl EventData {
/// metrics collection is based on the same event. Namely, one issue that can arise from creating the event
/// from the event data twice (once for the expected and once for actual), it can result in a timestamp in
/// the event which may or may not have the same millisecond precision as it's counterpart.
///
/// For transforms and sinks, generally, the only way to cause an error is if the event itself
/// is malformed in some way, which can be achieved without this test event variant.
#[derive(Clone, Debug, Deserialize)]
#[serde(from = "RawTestEvent")]
#[serde(untagged)]
pub enum TestEvent {
/// The event is used, as-is, without modification.
Passthrough(Event),

/// The event is potentially modified by the external resource.
///
/// The modification made is dependent on the external resource, but this mode is made available
/// for when a test case wants to exercise the failure path, but cannot cause a failure simply
/// by constructing the event in a certain way i.e. adding an invalid field, or removing a
/// required field, or using an invalid field value, and so on.
/// The event is encoded using an encoding that differs from the component's
/// configured encoding, which should cause an error when the event is decoded.
FailWithAlternateEncoder(Event),

/// The event has an additional field injected prior to encoding, which should cause
/// an error when the event is decoded.
///
/// For transforms and sinks, generally, the only way to cause an error is if the event itself
/// is malformed in some way, which can be achieved without this test event variant.
Modified { modified: bool, event: Event },
/// This is useful for testing encodings that have strict schemas and cannot
/// handle arbitrary fields or differing data types for certain fields.
FailWithInjectedField(Event),
}

impl TestEvent {
#[allow(clippy::missing_const_for_fn)] // const cannot run destructor
pub fn into_event(self) -> Event {
match self {
Self::Passthrough(event) => event,
Self::Modified { event, .. } => event,
Self::FailWithAlternateEncoder(event) => event,
Self::FailWithInjectedField(event) => event,
}
}

pub fn get_event(&mut self) -> &mut Event {
match self {
Self::Passthrough(event) => event,
Self::Modified { event, .. } => event,
Self::FailWithAlternateEncoder(event) => event,
Self::FailWithInjectedField(event) => event,
}
}

/// (should_fail, event)
pub fn get(self) -> (bool, Event) {
match self {
Self::Passthrough(event) => (false, event),
Self::Modified { modified, event } => (modified, event),
Self::FailWithAlternateEncoder(event) => (true, event),
Self::FailWithInjectedField(event) => (true, event),
}
}
}
Expand All @@ -104,10 +124,25 @@ impl From<RawTestEvent> for TestEvent {
RawTestEvent::Passthrough(event_data) => {
TestEvent::Passthrough(event_data.into_event())
}
RawTestEvent::Modified { modified, event } => TestEvent::Modified {
modified,
event: event.into_event(),
},
RawTestEvent::AlternateEncoder {
fail_encoding_of: event_data,
} => TestEvent::FailWithAlternateEncoder(event_data.into_event()),
RawTestEvent::WithField {
event,
name,
value,
fail,
} => {
let mut event = event.into_event();
let log_event = event.as_mut_log();
log_event.insert(name.as_str(), value);

if fail.unwrap_or_default() {
TestEvent::FailWithInjectedField(event)
} else {
TestEvent::Passthrough(event)
}
}
}
}
}
Expand All @@ -118,13 +153,13 @@ pub fn encode_test_event(
event: TestEvent,
) {
match event {
TestEvent::Passthrough(event) => {
TestEvent::Passthrough(event) | TestEvent::FailWithInjectedField(event) => {
// Encode the event normally.
encoder
.encode(event, buf)
.expect("should not fail to encode input event");
}
TestEvent::Modified { event, .. } => {
TestEvent::FailWithAlternateEncoder(event) => {
// This is a little fragile, but we check what serializer this encoder uses, and based
// on `Serializer::supports_json`, we choose an opposing codec. For example, if the
// encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise
Expand Down
16 changes: 12 additions & 4 deletions src/components/validation/runner/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,17 @@ pub struct TopologyBuilder {

impl TopologyBuilder {
/// Creates a component topology for the given component configuration.
pub fn from_configuration(configuration: &ValidationConfiguration) -> Self {
let component_configuration = configuration.component_configuration();
match component_configuration {
pub fn from_configuration(
configuration: &ValidationConfiguration,
config_name: Option<&String>,
) -> Result<Self, String> {
let component_configuration = configuration
.component_configuration_for_test_case(config_name)
.ok_or(format!(
"No test case name defined for configuration {:?}.",
config_name
))?;
Ok(match component_configuration {
ComponentConfiguration::Source(source) => {
debug_assert_eq!(configuration.component_type(), ComponentType::Source);
Self::from_source(source)
Expand All @@ -39,7 +47,7 @@ impl TopologyBuilder {
debug_assert_eq!(configuration.component_type(), ComponentType::Sink);
Self::from_sink(sink)
}
}
})
}

/// Creates a component topology for validating a source.
Expand Down
Loading