Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
09c562a
add fix and small refactor
neuronull Jul 11, 2023
1e0e6e7
fix compilation errors
neuronull Jul 11, 2023
63a9581
3 ticks
neuronull Jul 11, 2023
c6af43e
dont compute expected metrics in validator
neuronull Jul 12, 2023
55a3518
cleanup
neuronull Jul 12, 2023
8ec87b3
cleanup
neuronull Jul 12, 2023
4b3b721
clippy
neuronull Jul 12, 2023
f9854bf
feedback tz: sent_eventssssss
neuronull Jul 12, 2023
0577ee6
feedback tz: fix telemetry shutdown finishing logic
neuronull Jul 12, 2023
51e9ab4
3 ticks
neuronull Jul 13, 2023
99a2d20
Merge branch 'master' into neuronull/draft_component_validation_bette…
neuronull Jul 13, 2023
e8cdf11
small reorg to add sinks
neuronull Jul 13, 2023
c460a49
mini refactor of the component spec validators
neuronull Jul 14, 2023
3daced5
attempt to set expected values from the resource
neuronull Jul 14, 2023
b7a7bd3
feedback tz- from not try_from
neuronull Jul 19, 2023
af7e9b2
Merge branch 'neuronull/draft_component_validation_better_validation'…
neuronull Jul 19, 2023
0ce0e25
back to 3 ticks
neuronull Jul 19, 2023
35efd5a
fix incorrect expected values
neuronull Jul 19, 2023
1f4ea02
Even more reduction
neuronull Jul 19, 2023
e8b17af
clippy
neuronull Jul 19, 2023
cdeab8f
add the discarded events total check
neuronull Jul 19, 2023
a0f7a65
Merge branch 'master' into neuronull/draft_component_validation_bette…
neuronull Jul 19, 2023
0a6c056
Merge branch 'neuronull/draft_component_validation_better_validation'…
neuronull Jul 19, 2023
77c110b
Merge branch 'master' into neuronull/component_validation_sink_compon…
neuronull Jul 20, 2023
006db51
workaround the new sync issues
neuronull Jul 20, 2023
604abea
multi config support
neuronull Jul 21, 2023
82faf6c
cleanup
neuronull Jul 21, 2023
4745a2f
Merge branch 'master' into neuronull/component_validation_sink_compon…
neuronull Jan 23, 2024
b8650d0
Merge branch 'neuronull/component_validation_sink_component_spec' int…
neuronull Jan 23, 2024
1a43e8b
check events
neuronull Jan 23, 2024
f54bdac
Merge branch 'master' into neuronull/component_validation_sink_compon…
neuronull Jan 26, 2024
26bde95
Merge branch 'neuronull/component_validation_sink_component_spec' int…
neuronull Jan 26, 2024
f6aa019
partial feedback
neuronull Feb 2, 2024
a2689fe
thought i removed that
neuronull Feb 2, 2024
3ff66e0
use ref
neuronull Feb 5, 2024
73b8689
Merge branch 'neuronull/component_validation_sink_component_spec' int…
neuronull Feb 5, 2024
5d5e019
Merge branch 'master' into neuronull/component_validation_sink_sad_path
neuronull Feb 5, 2024
7d17599
feedback: dont introduce PassThroughFail variant
neuronull Feb 20, 2024
da6267c
feedback: adjust enum variant names for clarity
neuronull Feb 20, 2024
b5a2388
feedback: no idea what I was thinking with `input_codec`
neuronull Feb 20, 2024
94fcdbd
spell check
neuronull Feb 20, 2024
f560911
fr
neuronull Feb 20, 2024
c31cba5
fix sync issues
neuronull Feb 22, 2024
ce4bf5f
remove unused enum variant
neuronull Feb 22, 2024
0415759
feedback- update docs
neuronull Feb 22, 2024
68559cf
Merge branch 'neuronull/component_validation_sink_sad_path' into neur…
neuronull Feb 22, 2024
f0dcb41
check_events
neuronull Feb 22, 2024
43ea0fe
touchup
neuronull Feb 22, 2024
472e7c1
spell checker
neuronull Feb 22, 2024
5a45287
Merge branch 'master' into neuronull/OPA-1048_component_validation_fi…
neuronull Feb 22, 2024
0fb8eb9
merge leftover
neuronull Feb 22, 2024
be38808
feedback: log formating
neuronull Feb 28, 2024
64303c1
feedback- better approach to driving shutdown
neuronull Feb 28, 2024
173b2e0
give a generous timeout
neuronull Feb 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,9 @@ enterprise-tests = [
]

component-validation-runner = ["dep:tonic", "sources-internal_logs", "sources-internal_metrics", "sources-vector", "sinks-vector"]
component-validation-tests = ["component-validation-runner", "sources", "transforms", "sinks"]
# For now, only include components that implement ValidatableComponent.
# In the future, this can change to simply reference the targets `sources`, `transforms`, `sinks`
component-validation-tests = ["component-validation-runner", "sources-http_client", "sources-http_server", "sinks-http"]

# Grouping together features for benchmarks. We exclude the API client due to it causing the build process to run out
# of memory when those additional dependencies are built in CI.
Expand Down
67 changes: 30 additions & 37 deletions src/components/validation/resources/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use bytes::BytesMut;
use serde::Deserialize;
use serde_json::Value;
use snafu::Snafu;
use tokio_util::codec::Encoder as _;

Expand Down Expand Up @@ -30,16 +29,9 @@ pub enum RawTestEvent {
/// is malformed in some way, which can be achieved without this test event variant.
AlternateEncoder { fail_encoding_of: EventData },

/// The event is created, and the specified field is added to it.
///
/// This allows the ability to hit code paths where some codecs require specific fields to be of specific
/// types, thus allowing us to encode into the input runner without error, but encoding in the component
/// under test can be set up to fail.
WithField {
event: EventData,
name: String,
value: Value,
fail: Option<bool>,
/// The event will be rejected by the external resource.
ResourceReject {
external_resource_rejects: EventData,
},
}

Expand Down Expand Up @@ -79,12 +71,9 @@ pub enum TestEvent {
/// configured encoding, which should cause an error when the event is decoded.
FailWithAlternateEncoder(Event),

/// The event has an additional field injected prior to encoding, which should cause
/// an error when the event is decoded.
///
/// This is useful for testing encodings that have strict schemas and cannot
/// handle arbitrary fields or differing data types for certain fields.
FailWithInjectedField(Event),
/// The event encodes successfully but when the external resource receives that event, it should
/// throw a failure.
FailWithExternalResource(Event),
}

impl TestEvent {
Expand All @@ -93,15 +82,15 @@ impl TestEvent {
match self {
Self::Passthrough(event) => event,
Self::FailWithAlternateEncoder(event) => event,
Self::FailWithInjectedField(event) => event,
Self::FailWithExternalResource(event) => event,
}
}

pub fn get_event(&mut self) -> &mut Event {
match self {
Self::Passthrough(event) => event,
Self::FailWithAlternateEncoder(event) => event,
Self::FailWithInjectedField(event) => event,
Self::FailWithExternalResource(event) => event,
}
}

Expand All @@ -110,7 +99,24 @@ impl TestEvent {
match self {
Self::Passthrough(event) => (false, event),
Self::FailWithAlternateEncoder(event) => (true, event),
Self::FailWithInjectedField(event) => (true, event),
Self::FailWithExternalResource(event) => (true, event),
}
}

/// True if the event should fail, false otherwise.
pub const fn should_fail(&self) -> bool {
match self {
Self::Passthrough(_) => false,
Self::FailWithAlternateEncoder(_) | Self::FailWithExternalResource(_) => true,
}
}

/// True if the event should be rejected by the external resource in order to
/// trigger a failure path.
pub const fn should_reject(&self) -> bool {
match self {
Self::Passthrough(_) | Self::FailWithAlternateEncoder(_) => false,
Self::FailWithExternalResource(_) => true,
}
}
}
Expand All @@ -127,22 +133,9 @@ impl From<RawTestEvent> for TestEvent {
RawTestEvent::AlternateEncoder {
fail_encoding_of: event_data,
} => TestEvent::FailWithAlternateEncoder(event_data.into_event()),
RawTestEvent::WithField {
event,
name,
value,
fail,
} => {
let mut event = event.into_event();
let log_event = event.as_mut_log();
log_event.insert(name.as_str(), value);

if fail.unwrap_or_default() {
TestEvent::FailWithInjectedField(event)
} else {
TestEvent::Passthrough(event)
}
}
RawTestEvent::ResourceReject {
external_resource_rejects: event_data,
} => TestEvent::FailWithExternalResource(event_data.into_event()),
}
}
}
Expand All @@ -153,7 +146,7 @@ pub fn encode_test_event(
event: TestEvent,
) {
match event {
TestEvent::Passthrough(event) | TestEvent::FailWithInjectedField(event) => {
TestEvent::Passthrough(event) | TestEvent::FailWithExternalResource(event) => {
// Encode the event normally.
encoder
.encode(event, buf)
Expand Down
105 changes: 68 additions & 37 deletions src/components/validation/resources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl HttpResourceConfig {
codec: ResourceCodec,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
input_events: Vec<TestEvent>,
runner_metrics: &Arc<Mutex<RunnerMetrics>>,
) -> vector_lib::Result<()> {
match direction {
Expand All @@ -77,9 +78,14 @@ impl HttpResourceConfig {
task_coordinator,
)),
// The sink will push data to us.
ResourceDirection::Push => {
spawn_output_http_server(self, codec, output_tx, task_coordinator, runner_metrics)
}
ResourceDirection::Push => spawn_output_http_server(
self,
codec,
output_tx,
task_coordinator,
input_events,
runner_metrics,
),
}
}
}
Expand Down Expand Up @@ -132,10 +138,11 @@ fn spawn_input_http_server(
// events and working with the HTTP server as they're consumed.
let resource_started = task_coordinator.track_started();
let resource_completed = task_coordinator.track_completed();
let mut resource_shutdown_rx = task_coordinator.register_for_shutdown();

tokio::spawn(async move {
resource_started.mark_as_done();
debug!("HTTP server external input resource started.");
info!("HTTP server external input resource started.");

let mut input_finished = false;

Expand All @@ -152,7 +159,7 @@ fn spawn_input_http_server(
outstanding_events.push_back(event);
},
None => {
trace!("HTTP server external input resource input is finished.");
info!("HTTP server external input resource input is finished.");
input_finished = true;
},
},
Expand All @@ -175,16 +182,18 @@ fn spawn_input_http_server(
}
// Mark ourselves as completed now that we've sent all inputs to the source, and
// additionally signal the HTTP server to also gracefully shutdown.
_ = http_server_shutdown_tx.send(());
info!("HTTP server external input resource signalling ready for shutdown.");

// TODO - currently we are getting lucky in the testing of `http_client` source... if the source tries to query
// this server but we have already shut down the thread, then it will generate an error which can throw off our error
// validation.
// I think the solution involves adding synchronization to wait here for the runner to tell us to shutdown.
// Wait for the runner to signal us to shutdown
resource_shutdown_rx.wait().await;

// Shutdown the server
_ = http_server_shutdown_tx.send(());

info!("HTTP server external input resource marking as done.");
resource_completed.mark_as_done();

debug!("HTTP server external input resource completed.");
info!("HTTP server external input resource completed.");
});
}

Expand All @@ -205,7 +214,7 @@ fn spawn_input_http_client(
// Mark ourselves as started. We don't actually do anything until we get our first input
// message, though.
started.mark_as_done();
debug!("HTTP client external input resource started.");
info!("HTTP client external input resource started.");

let client = Client::builder().build_http::<Body>();
let request_uri = config.uri;
Expand Down Expand Up @@ -238,7 +247,7 @@ fn spawn_input_http_client(
// Mark ourselves as completed now that we've sent all inputs to the source.
completed.mark_as_done();

debug!("HTTP client external input resource completed.");
info!("HTTP client external input resource completed.");
});
}

Expand All @@ -249,6 +258,7 @@ fn spawn_output_http_server(
codec: ResourceCodec,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
input_events: Vec<TestEvent>,
runner_metrics: &Arc<Mutex<RunnerMetrics>>,
) -> vector_lib::Result<()> {
// This HTTP server will wait for events to be sent by a sink, and collect them and send them on
Expand All @@ -257,6 +267,8 @@ fn spawn_output_http_server(
// First, we'll build and spawn our HTTP server.
let decoder = codec.into_decoder()?;

let should_reject = input_events.iter().filter(|te| te.should_reject()).count() > 0;

let (_, http_server_shutdown_tx) = spawn_http_server(
task_coordinator,
&config,
Expand All @@ -273,28 +285,41 @@ fn spawn_output_http_server(
loop {
match decoder.decode_eof(&mut body) {
Ok(Some((events, byte_size))) => {
let mut output_runner_metrics =
output_runner_metrics.lock().await;
debug!("HTTP server external output resource decoded {byte_size} bytes.");

// Update the runner metrics for the received events. This will later
// be used in the Validators, as the "expected" case.
output_runner_metrics.received_bytes_total += byte_size as u64;

output_runner_metrics.received_events_total +=
events.len() as u64;

events.iter().for_each(|event| {
output_runner_metrics.received_event_bytes_total +=
event.estimated_json_encoded_size_of().get() as u64;
});

output_tx
.send(events.to_vec())
.await
.expect("should not fail to send output event");
if should_reject {
info!("HTTP server external output resource decoded {byte_size} bytes but test case configured to reject.");
} else {
let mut output_runner_metrics =
output_runner_metrics.lock().await;
info!("HTTP server external output resource decoded {byte_size} bytes.");

// Update the runner metrics for the received events. This will later
// be used in the Validators, as the "expected" case.
output_runner_metrics.received_bytes_total +=
byte_size as u64;

output_runner_metrics.received_events_total +=
events.len() as u64;

events.iter().for_each(|event| {
output_runner_metrics.received_event_bytes_total +=
event.estimated_json_encoded_size_of().get() as u64;
});

output_tx
.send(events.to_vec())
.await
.expect("should not fail to send output event");
}
}
Ok(None) => {
if should_reject {
// This status code is not retried and should result in the component under test
// emitting error events
return StatusCode::BAD_REQUEST.into_response();
} else {
return StatusCode::OK.into_response();
}
}
Ok(None) => return StatusCode::OK.into_response(),
Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
}
}
Expand All @@ -312,14 +337,20 @@ fn spawn_output_http_server(

tokio::spawn(async move {
resource_started.mark_as_done();
debug!("HTTP server external output resource started.");
info!("HTTP server external output resource started.");

// Wait for the runner to tell us to shutdown
resource_shutdown_rx.wait().await;
_ = http_server_shutdown_tx.send(());

// signal the server to shutdown
let _ = http_server_shutdown_tx.send(());

// mark ourselves as done
resource_completed.mark_as_done();

debug!("HTTP server external output resource completed.");
info!("HTTP server external output resource completed.");
});

Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions src/components/validation/resources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ impl ExternalResource {
self,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
input_events: Vec<TestEvent>,
runner_metrics: &Arc<Mutex<RunnerMetrics>>,
) -> vector_lib::Result<()> {
match self.definition {
Expand All @@ -348,6 +349,7 @@ impl ExternalResource {
self.codec,
output_tx,
task_coordinator,
input_events,
runner_metrics,
),
}
Expand Down
Loading