Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/components/validation/resources/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ pub enum TestEvent {
/// For transforms and sinks, generally, the only way to cause an error is if the event itself
/// is malformed in some way, which can be achieved without this test event variant.
Modified { modified: bool, event: EventData },

/// The event is interrupted by the external resource. This is used to test
/// the failure path where a connection is interrupted while the event is
/// being processed.
Interrupted { interrupted: bool, event: EventData },
}

impl TestEvent {
pub fn into_event(self) -> Event {
match self {
Self::Passthrough(event) => event.into_event(),
Self::Modified { event, .. } => event.into_event(),
Self::Interrupted { event, .. } => event.into_event(),
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions src/components/validation/resources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,14 @@ pub fn encode_test_event(
.encode(event.into_event(), buf)
.expect("should not fail to encode input event");
}
TestEvent::Interrupted {
interrupted: _,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can these two cases be combined since they are identical?

event,
} => {
encoder
.encode(event.into_event(), buf)
.expect("should not fail to encode input event");
}
TestEvent::Modified { event, .. } => {
// This is a little fragile, but we check what serializer this encoder uses, and based
// on `Serializer::supports_json`, we choose an opposing codec. For example, if the
Expand Down
53 changes: 53 additions & 0 deletions src/components/validation/validators/component_spec/sources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub enum SourceMetrics {
SentEventsTotal,
SentEventBytesTotal,
ErrorsTotal,
EventsDropped,
}

impl SourceMetrics {
Expand All @@ -29,6 +30,7 @@ impl SourceMetrics {
SourceMetrics::SentEventsTotal => "component_sent_events_total",
SourceMetrics::SentEventBytesTotal => "component_sent_event_bytes_total",
SourceMetrics::ErrorsTotal => "component_errors_total",
SourceMetrics::EventsDropped => "component_discarded_events_total",
}
}
}
Expand All @@ -49,6 +51,7 @@ pub fn validate_sources(
validate_component_sent_events_total,
validate_component_sent_event_bytes_total,
validate_component_errors_total,
validate_component_errors_dropped_total,
];

for v in validations.iter() {
Expand Down Expand Up @@ -404,3 +407,53 @@ fn validate_component_errors_total(

Ok(vec![format!("{}: {}", SourceMetrics::ErrorsTotal, errors,)])
}

fn validate_component_errors_dropped_total(
_configuration: &ValidationConfiguration,
inputs: &[TestEvent],
_outputs: &[Event],
telemetry_events: &[Event],
) -> Result<Vec<String>, Vec<String>> {
let mut errs: Vec<String> = Vec::new();

let metrics = filter_events_by_metric_and_component(
telemetry_events,
&SourceMetrics::EventsDropped,
TEST_SOURCE_NAME,
);

let errors: i32 = sum_counters(SourceMetrics::EventsDropped, &metrics)? as i32;

let expected_errors: i32 = inputs.iter().fold(0, |acc, i| {
if let TestEvent::Interrupted { .. } = i {
return acc + 1;
}
acc
});

debug!(
"{}: {} errors, expected at least {}",
SourceMetrics::EventsDropped,
errors,
expected_errors,
);

if errors > 0 {
errs.push(format!(
"{}: expected at least {} errors, but received {}",
SourceMetrics::EventsDropped,
expected_errors,
errors
));
}

if !errs.is_empty() {
return Err(errs);
}

Ok(vec![format!(
"{}: {}",
SourceMetrics::EventsDropped,
errors,
)])
}