Skip to content
13 changes: 13 additions & 0 deletions src/components/validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ macro_rules! register_validatable_component {
};
}

/// Input and Output runners populate this structure as they send and receive events.
/// The structure is passed into the validator to use as the expected values for the
/// metrics that the components under test actually output.
#[derive(Default)]
pub struct RunnerMetrics {
pub received_events_total: u64,
pub received_event_bytes_total: u64,
pub received_bytes_total: u64,
pub sent_bytes_total: u64, // a reciprocal for received_bytes_total
pub sent_event_bytes_total: u64,
pub sent_events_total: u64,
}

#[cfg(all(test, feature = "component-validation-tests"))]
mod tests {
use std::{
Expand Down
111 changes: 100 additions & 11 deletions src/components/validation/resources/event.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
use bytes::BytesMut;
use serde::Deserialize;
use snafu::Snafu;
use tokio_util::codec::Encoder as _;

use crate::codecs::Encoder;
use codecs::{
encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues,
NewlineDelimitedEncoder,
};
use vector_core::event::{Event, LogEvent};

/// An event used in a test case.
/// A test case event for deserialization from yaml file.
/// This is an intermediary step to TestEvent.
#[derive(Clone, Debug, Deserialize)]
#[serde(untagged)]
pub enum TestEvent {
pub enum RawTestEvent {
/// The event is used, as-is, without modification.
Passthrough(EventData),

Expand All @@ -20,15 +30,6 @@ pub enum TestEvent {
Modified { modified: bool, event: EventData },
}

impl TestEvent {
pub fn into_event(self) -> Event {
match self {
Self::Passthrough(event) => event.into_event(),
Self::Modified { event, .. } => event.into_event(),
}
}
}

#[derive(Clone, Debug, Deserialize)]
#[serde(untagged)]
pub enum EventData {
Expand All @@ -44,3 +45,91 @@ impl EventData {
}
}
}

/// An event used in a test case.
/// It is important to have created the event with all fields, immediately after deserializing from the
/// test case definition yaml file. This ensures that the event data we are using in the expected/actual
/// metrics collection is based on the same event. Namely, one issue that can arise from creating the event
/// from the event data twice (once for the expected and once for actual), it can result in a timestamp in
/// the event which may or may not have the same millisecond precision as it's counterpart.
#[derive(Clone, Debug, Deserialize)]
#[serde(from = "RawTestEvent")]
#[serde(untagged)]
pub enum TestEvent {
/// The event is used, as-is, without modification.
Passthrough(Event),

/// The event is potentially modified by the external resource.
///
/// The modification made is dependent on the external resource, but this mode is made available
/// for when a test case wants to exercise the failure path, but cannot cause a failure simply
/// by constructing the event in a certain way i.e. adding an invalid field, or removing a
/// required field, or using an invalid field value, and so on.
///
/// For transforms and sinks, generally, the only way to cause an error is if the event itself
/// is malformed in some way, which can be achieved without this test event variant.
Modified { modified: bool, event: Event },
}

impl TestEvent {
#[allow(clippy::missing_const_for_fn)] // const cannot run destructor
pub fn into_event(self) -> Event {
match self {
Self::Passthrough(event) => event,
Self::Modified { event, .. } => event,
}
}
}

#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
pub enum RawTestEventParseError {}

impl From<RawTestEvent> for TestEvent {
fn from(other: RawTestEvent) -> Self {
match other {
RawTestEvent::Passthrough(event_data) => {
TestEvent::Passthrough(event_data.into_event())
}
RawTestEvent::Modified { modified, event } => TestEvent::Modified {
modified,
event: event.into_event(),
},
}
}
}

pub fn encode_test_event(
encoder: &mut Encoder<encoding::Framer>,
buf: &mut BytesMut,
event: TestEvent,
) {
match event {
TestEvent::Passthrough(event) => {
// Encode the event normally.
encoder
.encode(event, buf)
.expect("should not fail to encode input event");
}
TestEvent::Modified { event, .. } => {
// This is a little fragile, but we check what serializer this encoder uses, and based
// on `Serializer::supports_json`, we choose an opposing codec. For example, if the
// encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise
// versa.
let mut alt_encoder = if encoder.serializer().supports_json() {
Encoder::<encoding::Framer>::new(
LengthDelimitedEncoder::new().into(),
LogfmtSerializer::new().into(),
)
} else {
Encoder::<encoding::Framer>::new(
NewlineDelimitedEncoder::new().into(),
JsonSerializer::new(MetricTagValues::default()).into(),
)
};

alt_encoder
.encode(event, buf)
.expect("should not fail to encode input event");
}
}
}
69 changes: 12 additions & 57 deletions src/components/validation/resources/http.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
collections::VecDeque,
future::Future,
net::{IpAddr, SocketAddr},
str::FromStr,
sync::Arc,
Expand All @@ -11,26 +12,18 @@ use axum::{
Router,
};
use bytes::BytesMut;
use codecs::{
encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues,
NewlineDelimitedEncoder,
};
use http::{Method, Request, StatusCode, Uri};
use hyper::{Body, Client, Server};
use std::future::Future;
use tokio::{
select,
sync::{mpsc, oneshot, Mutex, Notify},
};
use tokio_util::codec::{Decoder, Encoder as _};
use vector_core::event::Event;
use tokio_util::codec::Decoder;

use crate::{
codecs::Encoder,
components::validation::sync::{Configuring, TaskCoordinator},
};
use crate::components::validation::sync::{Configuring, TaskCoordinator};
use vector_core::event::Event;

use super::{ResourceCodec, ResourceDirection, TestEvent};
use super::{encode_test_event, ResourceCodec, ResourceDirection, TestEvent};

/// An HTTP resource.
#[derive(Clone)]
Expand Down Expand Up @@ -67,7 +60,7 @@ impl HttpResourceConfig {
self,
direction: ResourceDirection,
codec: ResourceCodec,
output_tx: mpsc::Sender<Event>,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
match direction {
Expand Down Expand Up @@ -230,7 +223,7 @@ fn spawn_input_http_client(
fn spawn_output_http_server(
config: HttpResourceConfig,
codec: ResourceCodec,
output_tx: mpsc::Sender<Event>,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
// This HTTP server will wait for events to be sent by a sink, and collect them and send them on
Expand All @@ -252,12 +245,10 @@ fn spawn_output_http_server(
loop {
match decoder.decode_eof(&mut body) {
Ok(Some((events, _byte_size))) => {
for event in events {
output_tx
.send(event)
.await
.expect("should not fail to send output event");
}
output_tx
.send(events.to_vec())
.await
.expect("should not fail to send output event");
}
Ok(None) => return StatusCode::OK.into_response(),
Err(_) => return StatusCode::INTERNAL_SERVER_ERROR.into_response(),
Expand Down Expand Up @@ -290,7 +281,7 @@ fn spawn_output_http_server(
fn spawn_output_http_client(
_config: HttpResourceConfig,
_codec: ResourceCodec,
_output_tx: mpsc::Sender<Event>,
_output_tx: mpsc::Sender<Vec<Event>>,
_task_coordinator: &TaskCoordinator<Configuring>,
) {
// TODO: The `prometheus_exporter` sink is the only sink that exposes an HTTP server which must be
Expand Down Expand Up @@ -400,39 +391,3 @@ fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {

SocketAddr::from((uri_host, uri_port))
}

pub fn encode_test_event(
encoder: &mut Encoder<encoding::Framer>,
buf: &mut BytesMut,
event: TestEvent,
) {
match event {
TestEvent::Passthrough(event) => {
// Encode the event normally.
encoder
.encode(event.into_event(), buf)
.expect("should not fail to encode input event");
}
TestEvent::Modified { event, .. } => {
// This is a little fragile, but we check what serializer this encoder uses, and based
// on `Serializer::supports_json`, we choose an opposing codec. For example, if the
// encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise
// versa.
let mut alt_encoder = if encoder.serializer().supports_json() {
Encoder::<encoding::Framer>::new(
LengthDelimitedEncoder::new().into(),
LogfmtSerializer::new().into(),
)
} else {
Encoder::<encoding::Framer>::new(
NewlineDelimitedEncoder::new().into(),
JsonSerializer::new(MetricTagValues::default()).into(),
)
};

alt_encoder
.encode(event.into_event(), buf)
.expect("should not fail to encode input event");
}
}
}
6 changes: 3 additions & 3 deletions src/components/validation/resources/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use vector_core::{config::DataType, event::Event};

use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming};

pub use self::event::{EventData, TestEvent};
pub use self::http::{encode_test_event, HttpResourceConfig};
pub use self::event::{encode_test_event, TestEvent};
pub use self::http::HttpResourceConfig;

use super::sync::{Configuring, TaskCoordinator};

Expand Down Expand Up @@ -308,7 +308,7 @@ impl ExternalResource {
/// Spawns this resource for use as an output for a sink.
pub fn spawn_as_output(
self,
output_tx: mpsc::Sender<Event>,
output_tx: mpsc::Sender<Vec<Event>>,
task_coordinator: &TaskCoordinator<Configuring>,
) {
match self.definition {
Expand Down
31 changes: 17 additions & 14 deletions src/components/validation/runner/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ use crate::{

#[derive(Clone)]
pub struct EventForwardService {
tx: mpsc::Sender<Event>,
tx: mpsc::Sender<Vec<Event>>,
}

impl From<mpsc::Sender<Event>> for EventForwardService {
fn from(tx: mpsc::Sender<Event>) -> Self {
impl From<mpsc::Sender<Vec<Event>>> for EventForwardService {
fn from(tx: mpsc::Sender<Vec<Event>>) -> Self {
Self { tx }
}
}
Expand All @@ -42,14 +42,17 @@ impl VectorService for EventForwardService {
&self,
request: tonic::Request<PushEventsRequest>,
) -> Result<tonic::Response<PushEventsResponse>, Status> {
let events = request.into_inner().events.into_iter().map(Event::from);

for event in events {
self.tx
.send(event)
.await
.expect("event forward rx should not close first");
}
let events = request
.into_inner()
.events
.into_iter()
.map(Event::from)
.collect();

self.tx
.send(events)
.await
.expect("event forward rx should not close first");

Ok(tonic::Response::new(PushEventsResponse {}))
}
Expand All @@ -74,7 +77,7 @@ pub struct InputEdge {
pub struct OutputEdge {
listen_addr: GrpcAddress,
service: VectorServer<EventForwardService>,
rx: mpsc::Receiver<Event>,
rx: mpsc::Receiver<Vec<Event>>,
}

impl InputEdge {
Expand Down Expand Up @@ -129,7 +132,7 @@ impl OutputEdge {
pub fn spawn_output_server(
self,
task_coordinator: &TaskCoordinator<Configuring>,
) -> mpsc::Receiver<Event> {
) -> mpsc::Receiver<Vec<Event>> {
spawn_grpc_server(self.listen_addr, self.service, task_coordinator);
self.rx
}
Expand Down Expand Up @@ -184,5 +187,5 @@ pub fn spawn_grpc_server<S>(

pub struct ControlledEdges {
pub input: Option<mpsc::Sender<TestEvent>>,
pub output: Option<mpsc::Receiver<Event>>,
pub output: Option<mpsc::Receiver<Vec<Event>>>,
}
Loading