Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
09c562a
add fix and small refactor
neuronull Jul 11, 2023
1e0e6e7
fix compilation errors
neuronull Jul 11, 2023
63a9581
3 ticks
neuronull Jul 11, 2023
c6af43e
dont compute expected metrics in validator
neuronull Jul 12, 2023
55a3518
cleanup
neuronull Jul 12, 2023
8ec87b3
cleanup
neuronull Jul 12, 2023
4b3b721
clippy
neuronull Jul 12, 2023
f9854bf
feedback tz: sent_eventssssss
neuronull Jul 12, 2023
0577ee6
feedback tz: fix telemetry shutdown finishing logic
neuronull Jul 12, 2023
51e9ab4
3 ticks
neuronull Jul 13, 2023
99a2d20
Merge branch 'master' into neuronull/draft_component_validation_bette…
neuronull Jul 13, 2023
e8cdf11
small reorg to add sinks
neuronull Jul 13, 2023
c460a49
mini refactor of the component spec validators
neuronull Jul 14, 2023
3daced5
attempt to set expected values from the resource
neuronull Jul 14, 2023
b7a7bd3
feedback tz- from not try_from
neuronull Jul 19, 2023
af7e9b2
Merge branch 'neuronull/draft_component_validation_better_validation'…
neuronull Jul 19, 2023
0ce0e25
back to 3 ticks
neuronull Jul 19, 2023
35efd5a
fix incorrect expected values
neuronull Jul 19, 2023
1f4ea02
Even more reduction
neuronull Jul 19, 2023
e8b17af
clippy
neuronull Jul 19, 2023
cdeab8f
add the discarded events total check
neuronull Jul 19, 2023
a0f7a65
Merge branch 'master' into neuronull/draft_component_validation_bette…
neuronull Jul 19, 2023
0a6c056
Merge branch 'neuronull/draft_component_validation_better_validation'…
neuronull Jul 19, 2023
77c110b
Merge branch 'master' into neuronull/component_validation_sink_compon…
neuronull Jul 20, 2023
006db51
workaround the new sync issues
neuronull Jul 20, 2023
4745a2f
Merge branch 'master' into neuronull/component_validation_sink_compon…
neuronull Jan 23, 2024
1a43e8b
check events
neuronull Jan 23, 2024
f54bdac
Merge branch 'master' into neuronull/component_validation_sink_compon…
neuronull Jan 26, 2024
f6aa019
partial feedback
neuronull Feb 2, 2024
a2689fe
thought i removed that
neuronull Feb 2, 2024
3ff66e0
use ref
neuronull Feb 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/components/validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ pub use self::runner::*;
pub use self::test_case::{TestCase, TestCaseExpectation};
pub use self::validators::*;

pub mod component_names {
pub const TEST_SOURCE_NAME: &str = "test_source";
pub const TEST_SINK_NAME: &str = "test_sink";
pub const TEST_TRANSFORM_NAME: &str = "test_transform";
pub const TEST_INPUT_SOURCE_NAME: &str = "input_source";
pub const TEST_OUTPUT_SINK_NAME: &str = "output_sink";
}

/// Component types that can be validated.
// TODO: We should centralize this in `vector-common` or something, where both this code and the
// configuration schema stuff (namely the proc macros that use this) can share it.
Expand Down Expand Up @@ -173,15 +181,16 @@ macro_rules! register_validatable_component {
/// Input and Output runners populate this structure as they send and receive events.
/// The structure is passed into the validator to use as the expected values for the
/// metrics that the components under test actually output.
#[derive(Default)]
#[derive(Default, Debug)]
pub struct RunnerMetrics {
pub received_events_total: u64,
pub received_event_bytes_total: u64,
pub received_bytes_total: u64,
pub sent_bytes_total: u64, // a reciprocal for received_bytes_total
pub sent_bytes_total: u64,
pub sent_event_bytes_total: u64,
pub sent_events_total: u64,
pub errors_total: u64,
pub discarded_events_total: u64,
}

#[cfg(all(test, feature = "component-validation-tests"))]
Expand Down
14 changes: 14 additions & 0 deletions src/components/validation/resources/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ impl TestEvent {
Self::Modified { event, .. } => event,
}
}

pub fn get_event(&mut self) -> &mut Event {
match self {
Self::Passthrough(event) => event,
Self::Modified { event, .. } => event,
}
}

pub fn get(self) -> (bool, Event) {
match self {
Self::Passthrough(event) => (false, event),
Self::Modified { modified, event } => (modified, event),
}
}
}

#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
Expand Down
74 changes: 59 additions & 15 deletions src/components/validation/resources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ use tokio::{
};
use tokio_util::codec::Decoder;

use crate::components::validation::sync::{Configuring, TaskCoordinator};
use vector_lib::event::Event;
use crate::components::validation::{
sync::{Configuring, TaskCoordinator},
RunnerMetrics,
};
use vector_lib::{event::Event, EstimatedJsonEncodedSizeOf};

use super::{encode_test_event, ResourceCodec, ResourceDirection, TestEvent};

Expand All @@ -43,11 +46,12 @@ impl HttpResourceConfig {
codec: ResourceCodec,
input_rx: mpsc::Receiver<TestEvent>,
task_coordinator: &TaskCoordinator<Configuring>,
runner_metrics: &Arc<Mutex<RunnerMetrics>>,
) {
match direction {
// The source will pull data from us.
ResourceDirection::Pull => {
spawn_input_http_server(self, codec, input_rx, task_coordinator)
spawn_input_http_server(self, codec, input_rx, task_coordinator, runner_metrics)
}
// We'll push data to the source.
ResourceDirection::Push => {
Expand All @@ -62,6 +66,7 @@ impl HttpResourceConfig {
codec: ResourceCodec,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
runner_metrics: &Arc<Mutex<RunnerMetrics>>,
) -> vector_lib::Result<()> {
match direction {
// We'll pull data from the sink.
Expand All @@ -73,7 +78,7 @@ impl HttpResourceConfig {
)),
// The sink will push data to us.
ResourceDirection::Push => {
spawn_output_http_server(self, codec, output_tx, task_coordinator)
spawn_output_http_server(self, codec, output_tx, task_coordinator, runner_metrics)
}
}
}
Expand All @@ -86,6 +91,7 @@ fn spawn_input_http_server(
codec: ResourceCodec,
mut input_rx: mpsc::Receiver<TestEvent>,
task_coordinator: &TaskCoordinator<Configuring>,
runner_metrics: &Arc<Mutex<RunnerMetrics>>,
) {
// This HTTP server will poll the input receiver for input events and buffer them. When a
// request comes in on the right path/method, one buffered input event will be sent back. If no
Expand All @@ -97,8 +103,11 @@ fn spawn_input_http_server(
let encoder = codec.into_encoder();
let sendable_events = Arc::clone(&outstanding_events);

let (resource_notifier, http_server_shutdown_tx) =
spawn_http_server(task_coordinator, &config, move |_| {
let (resource_notifier, http_server_shutdown_tx) = spawn_http_server(
task_coordinator,
&config,
runner_metrics,
move |_request, _runner_metrics| {
let sendable_events = Arc::clone(&sendable_events);
let mut encoder = encoder.clone();

Expand All @@ -116,12 +125,14 @@ fn spawn_input_http_server(
StatusCode::OK.into_response()
}
}
});
},
);

// Now we'll create and spawn the resource's core logic loop which drives the buffering of input
// events and working with the HTTP server as they're consumed.
let resource_started = task_coordinator.track_started();
let resource_completed = task_coordinator.track_completed();

tokio::spawn(async move {
resource_started.mark_as_done();
debug!("HTTP server external input resource started.");
Expand All @@ -140,7 +151,10 @@ fn spawn_input_http_server(
let mut outstanding_events = outstanding_events.lock().await;
outstanding_events.push_back(event);
},
None => input_finished = true,
None => {
trace!("HTTP server external input resource input is finished.");
input_finished = true;
},
},

_ = resource_notifier.notified() => {
Expand All @@ -159,10 +173,15 @@ fn spawn_input_http_server(
},
}
}

// Mark ourselves as completed now that we've sent all inputs to the source, and
// additionally signal the HTTP server to also gracefully shutdown.
_ = http_server_shutdown_tx.send(());

// TODO - currently we are getting lucky in the testing of `http_client` source... if the source tries to query
// this server but we have already shut down the thread, then it will generate an error which can throw off our error
// validation.
// I think the solution involves adding synchronization to wait here for the runner to tell us to shutdown.

Comment on lines +179 to +184
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, yeah.

I think what we probably want is to trigger the shutdown of the topology once we know all events on the input side have been sent or consumed. Once the events are in the topology, we should be "safe", in terms of knowing that the topology won't actually shutdown until those events make it through to the sink and are processed.

So for an HTTP server external resource, we'd want the shutdown signal to basically tell it to shutdown but only after all events have been sent to the source, which in this case would be essentially waiting until the http_client source has queried the HTTP server enough times, etc. For a push-based source, we'd only care that it managed to send out all events and that we'd gotten a response for all of them.

So we'd essentially update external input resources to have that behavior, and then the shutdown code in the runner would be like

  • trigger external input resource to shutdown
  • trigger topology to shutdown
  • wait for external input resource to shutdown/complete
  • wait for topology to shutdown/complete
  • rest of the steps, like wait for the external output resource, etc

Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense yes. I want to say I started down this path and then decided to table it for the time being. I will take a look at it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is isolated enough to put in a separate PR. I opened a ticket to track that. If it's alright I would probably tag you on the review for that as well as you understand the architecture of this framework.

resource_completed.mark_as_done();

debug!("HTTP server external input resource completed.");
Expand Down Expand Up @@ -230,15 +249,19 @@ fn spawn_output_http_server(
codec: ResourceCodec,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
runner_metrics: &Arc<Mutex<RunnerMetrics>>,
) -> vector_lib::Result<()> {
// This HTTP server will wait for events to be sent by a sink, and collect them and send them on
// via an output sender. We accept/collect events until we're told to shutdown.

// First, we'll build and spawn our HTTP server.
let decoder = codec.into_decoder()?;

let (_, http_server_shutdown_tx) =
spawn_http_server(task_coordinator, &config, move |request| {
let (_, http_server_shutdown_tx) = spawn_http_server(
task_coordinator,
&config,
runner_metrics,
move |request, output_runner_metrics| {
let output_tx = output_tx.clone();
let mut decoder = decoder.clone();

Expand All @@ -249,7 +272,23 @@ fn spawn_output_http_server(
let mut body = BytesMut::from(&body[..]);
loop {
match decoder.decode_eof(&mut body) {
Ok(Some((events, _byte_size))) => {
Ok(Some((events, byte_size))) => {
let mut output_runner_metrics =
output_runner_metrics.lock().await;
debug!("HTTP server external output resource decoded {byte_size} bytes.");

// Update the runner metrics for the received events. This will later
// be used in the Validators, as the "expected" case.
output_runner_metrics.received_bytes_total += byte_size as u64;

output_runner_metrics.received_events_total +=
events.len() as u64;

events.iter().for_each(|event| {
output_runner_metrics.received_event_bytes_total +=
event.estimated_json_encoded_size_of().get() as u64;
});

output_tx
.send(events.to_vec())
.await
Expand All @@ -262,13 +301,15 @@ fn spawn_output_http_server(
}
}
}
});
},
);

// Now we'll create and spawn the resource's core logic loop which simply waits for the runner
// to instruct us to shutdown, and when that happens, cascades to shutting down the HTTP server.
let resource_started = task_coordinator.track_started();
let resource_completed = task_coordinator.track_completed();
let mut resource_shutdown_rx = task_coordinator.register_for_shutdown();

tokio::spawn(async move {
resource_started.mark_as_done();
debug!("HTTP server external output resource started.");
Expand Down Expand Up @@ -299,10 +340,11 @@ fn spawn_output_http_client(
fn spawn_http_server<H, F, R>(
task_coordinator: &TaskCoordinator<Configuring>,
config: &HttpResourceConfig,
runner_metrics: &Arc<Mutex<RunnerMetrics>>,
handler: H,
) -> (Arc<Notify>, oneshot::Sender<()>)
where
H: Fn(Request<Body>) -> F + Clone + Send + 'static,
H: Fn(Request<Body>, Arc<Mutex<RunnerMetrics>>) -> F + Clone + Send + 'static,
F: Future<Output = R> + Send,
R: IntoResponse,
{
Expand All @@ -327,6 +369,8 @@ where
let resource_notifier = Arc::new(Notify::new());
let server_notifier = Arc::clone(&resource_notifier);

let output_runner_metrics = Arc::clone(runner_metrics);

tokio::spawn(async move {
// Create our HTTP server by binding as early as possible to return an error if we can't
// actually bind.
Expand All @@ -353,7 +397,7 @@ where
StatusCode::METHOD_NOT_ALLOWED
})
.on(method_filter, move |request: Request<Body>| {
let request_handler = handler(request);
let request_handler = handler(request, output_runner_metrics);
let notifier = Arc::clone(&server_notifier);

async move {
Expand Down
33 changes: 24 additions & 9 deletions src/components/validation/resources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod event;
mod http;

use tokio::sync::mpsc;
use std::sync::Arc;

use tokio::sync::{mpsc, Mutex};
use vector_lib::codecs::{
decoding::{self, DeserializerConfig},
encoding::{
Expand All @@ -16,7 +18,10 @@ use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingCo
pub use self::event::{encode_test_event, TestEvent};
pub use self::http::HttpResourceConfig;

use super::sync::{Configuring, TaskCoordinator};
use super::{
sync::{Configuring, TaskCoordinator},
RunnerMetrics,
};

/// The codec used by the external resource.
///
Expand Down Expand Up @@ -292,7 +297,7 @@ impl From<HttpResourceConfig> for ResourceDefinition {
/// the external resource must pull the data from the sink.
#[derive(Clone)]
pub struct ExternalResource {
direction: ResourceDirection,
pub direction: ResourceDirection,
definition: ResourceDefinition,
pub codec: ResourceCodec,
}
Expand All @@ -316,11 +321,16 @@ impl ExternalResource {
self,
input_rx: mpsc::Receiver<TestEvent>,
task_coordinator: &TaskCoordinator<Configuring>,
runner_metrics: &Arc<Mutex<RunnerMetrics>>,
) {
match self.definition {
ResourceDefinition::Http(http_config) => {
http_config.spawn_as_input(self.direction, self.codec, input_rx, task_coordinator)
}
ResourceDefinition::Http(http_config) => http_config.spawn_as_input(
self.direction,
self.codec,
input_rx,
task_coordinator,
runner_metrics,
),
}
}

Expand All @@ -329,11 +339,16 @@ impl ExternalResource {
self,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
runner_metrics: &Arc<Mutex<RunnerMetrics>>,
) -> vector_lib::Result<()> {
match self.definition {
ResourceDefinition::Http(http_config) => {
http_config.spawn_as_output(self.direction, self.codec, output_tx, task_coordinator)
}
ResourceDefinition::Http(http_config) => http_config.spawn_as_output(
self.direction,
self.codec,
output_tx,
task_coordinator,
runner_metrics,
),
}
}
}
7 changes: 1 addition & 6 deletions src/components/validation/runner/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
components::validation::{
component_names::*,
sync::{Configuring, TaskCoordinator},
util::GrpcAddress,
ComponentConfiguration, ComponentType, ValidationConfiguration,
Expand All @@ -21,12 +22,6 @@ pub struct TopologyBuilder {
output_edge: Option<OutputEdge>,
}

pub const TEST_SOURCE_NAME: &str = "test_source";
pub const TEST_SINK_NAME: &str = "test_sink";
pub const TEST_TRANSFORM_NAME: &str = "test_transform";
pub const TEST_INPUT_SOURCE_NAME: &str = "input_source";
pub const TEST_OUTPUT_SINK_NAME: &str = "output_sink";

impl TopologyBuilder {
/// Creates a component topology for the given component configuration.
pub fn from_configuration(configuration: &ValidationConfiguration) -> Self {
Expand Down
Loading