diff --git a/Cargo.toml b/Cargo.toml index 0eedc1b130aec..2a334f635b222 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -608,6 +608,7 @@ transforms-logs = [ "transforms-aws_ec2_metadata", "transforms-dedupe", "transforms-filter", + "transforms-gate", "transforms-log_to_metric", "transforms-lua", "transforms-metric_to_log", @@ -634,6 +635,7 @@ transforms-aggregate = [] transforms-aws_ec2_metadata = ["dep:arc-swap"] transforms-dedupe = ["transforms-impl-dedupe"] transforms-filter = [] +transforms-gate = [] transforms-log_to_metric = [] transforms-lua = ["dep:mlua", "vector-lib/lua"] transforms-metric_to_log = [] @@ -647,6 +649,7 @@ transforms-throttle = ["dep:governor"] # Implementations of transforms transforms-impl-sample = [] +transforms-impl-gate = [] transforms-impl-dedupe = ["dep:lru"] transforms-impl-reduce = [] diff --git a/src/internal_events/gate.rs b/src/internal_events/gate.rs new file mode 100644 index 0000000000000..d24e3e4282951 --- /dev/null +++ b/src/internal_events/gate.rs @@ -0,0 +1,14 @@ +use vector_lib::internal_event::{ComponentEventsDropped, Count, INTENTIONAL, Registered}; + +vector_lib::registered_event!( + GateEventsDropped => { + events_dropped: Registered> + = register!(ComponentEventsDropped::::from( + "The gate was closed." + )), + } + + fn emit(&self, data: Count) { + self.events_dropped.emit(data); + } +); diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index a303cdabe6175..112717d0eebb8 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -53,6 +53,8 @@ mod exec; mod file_descriptor; #[cfg(feature = "transforms-filter")] mod filter; +#[cfg(feature = "transforms-gate")] +mod gate; #[cfg(feature = "sources-fluent")] mod fluent; #[cfg(feature = "sources-gcp_pubsub")] @@ -195,6 +197,8 @@ pub(crate) use self::file::*; pub(crate) use self::file_descriptor::*; #[cfg(feature = "transforms-filter")] pub(crate) use self::filter::*; +#[cfg(feature = "transforms-gate")] +pub(crate) use self::gate::*; #[cfg(feature = "sources-fluent")] pub(crate) use self::fluent::*; #[cfg(feature = "sources-gcp_pubsub")] diff --git a/src/transforms/gate/config.rs b/src/transforms/gate/config.rs new file mode 100644 index 0000000000000..4c469fa492622 --- /dev/null +++ b/src/transforms/gate/config.rs @@ -0,0 +1,96 @@ +use vector_lib::config::{clone_input_definitions, LogNamespace}; +use vector_lib::configurable::configurable_component; + +use crate::{ + conditions::AnyCondition, + config::{ + DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, + TransformOutput, + }, + schema, + transforms::Transform, +}; + +use super::transform::Gate; + + +/// Configuration for the `gate` transform. +#[configurable_component(transform( +"gate", +"Open or close an event stream based on supplied criteria" +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct GateConfig { + /// A logical condition used to pass events through without gating. + pub pass_when: Option, + + /// A logical condition used to open the gate. + pub open_when: Option, + + /// A logical condition used to close the gate. + pub close_when: Option, + + /// Maximum number of events to keep in the buffer. + pub max_events: Option, + + /// Automatically close the gate after the buffer has been flushed. + pub auto_close: Option, + + /// Keep the gate open for additional number of events after the buffer has been flushed. + pub tail_events: Option, +} + +impl GenerateConfig for GateConfig { + fn generate_config() -> toml::Value { + toml::Value::try_from(Self { + pass_when: None::, + open_when: None::, + close_when: None::, + max_events: None::, + auto_close: None::, + tail_events: None::, + }).unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "gate")] +impl TransformConfig for GateConfig { + async fn build(&self, context: &TransformContext) -> crate::Result { + Ok(Transform::function(Gate::new( + self.pass_when + .as_ref() + .map(|condition| condition.build(&context.enrichment_tables)) + .transpose()?, + self.open_when + .as_ref() + .map(|condition| condition.build(&context.enrichment_tables)) + .transpose()?, + self.close_when + .as_ref() + .map(|condition| condition.build(&context.enrichment_tables)) + .transpose()?, + self.max_events.unwrap_or(100), + self.auto_close.unwrap_or(true), + self.tail_events.unwrap_or(10), + ).unwrap())) + } + + fn input(&self) -> Input { + Input::new(DataType::Log | DataType::Trace) + } + + fn outputs( + &self, + _: vector_lib::enrichment::TableRegistry, + input_definitions: &[(OutputId, schema::Definition)], + _: LogNamespace, + ) -> Vec { + // The event is not modified, so the definition is passed through as-is + vec![TransformOutput::new( + DataType::Log | DataType::Trace, + clone_input_definitions(input_definitions), + )] + } +} diff --git a/src/transforms/gate/mod.rs b/src/transforms/gate/mod.rs new file mode 100644 index 0000000000000..b7f94fef20595 --- /dev/null +++ b/src/transforms/gate/mod.rs @@ -0,0 +1,3 @@ +pub mod config; +pub mod transform; +mod state; diff --git a/src/transforms/gate/state.rs b/src/transforms/gate/state.rs new file mode 100644 index 0000000000000..2c0a35e662912 --- /dev/null +++ b/src/transforms/gate/state.rs @@ -0,0 +1,14 @@ +use crate::sinks::prelude::configurable_component; + +/// Gate state +#[configurable_component] +#[derive(Clone, Debug, Copy)] +#[derive(PartialEq)] +#[serde(rename_all = "snake_case")] +pub enum GateState { + /// Gate is open + Open, + + /// Gate is closed + Closed, +} diff --git a/src/transforms/gate/transform.rs b/src/transforms/gate/transform.rs new file mode 100644 index 0000000000000..05fd58d9d55fc --- /dev/null +++ b/src/transforms/gate/transform.rs @@ -0,0 +1,268 @@ +use std::collections::VecDeque; + +use vector_lib::internal_event::{Count, InternalEventHandle as _, Registered}; + +use crate::{ + conditions::Condition, + event::Event, + internal_events::GateEventsDropped, + transforms::{FunctionTransform, OutputBuffer}, + transforms::gate::state::GateState, +}; + +#[derive(Clone)] +pub struct Gate { + // Configuration parameters + pass_when: Option, + open_when: Option, + close_when: Option, + max_events: usize, + auto_close: bool, + tail_events: usize, + + // Internal variables + current_state: GateState, + buffer: VecDeque, + events_counter: usize, + events_dropped: Registered, + is_closing: bool, +} + +impl Gate { + // This function is dead code when the feature flag `transforms-impl-gate` is specified but not + // `transforms-gate`. + #![allow(dead_code)] + pub fn new( + pass_when: Option, + open_when: Option, + close_when: Option, + max_events: usize, + auto_close: bool, + tail_events: usize, + ) -> crate::Result { + let buffer = VecDeque::with_capacity(max_events); + + Ok(Gate { + pass_when, + open_when, + close_when, + max_events, + auto_close, + tail_events, + events_dropped: register!(GateEventsDropped), + current_state: GateState::Closed, + buffer, + events_counter: 0, + is_closing: false, + }) + } +} + +impl FunctionTransform for Gate { + fn transform(&mut self, output: &mut OutputBuffer, event: Event) { + let (pass_gate, event) = match self.pass_when.as_ref() { + Some(condition) => { + let (result, event) = condition.check(event); + (result, event) + } + _ => (false, event) + }; + + let (open_gate, event) = match self.open_when.as_ref() { + Some(condition) => { + let (result, event) = condition.check(event); + (result, event) + } + _ => (false, event) + }; + + let (close_gate, event) = match self.close_when.as_ref() { + Some(condition) => { + let (result, event) = condition.check(event); + (result, event) + } + _ => (false, event) + }; + + if self.buffer.capacity() < self.max_events { + self.buffer.reserve(self.max_events); + } + + if self.buffer.len() >= self.max_events { + self.buffer.pop_front(); + } + + self.buffer.push_back(event); + + if pass_gate { + self.buffer.pop_back().map(|evt| output.push(evt)); + } else if open_gate { + self.current_state = GateState::Open; + self.buffer.drain(..).for_each(|evt| output.push(evt)); + self.events_counter = 0; + + if self.auto_close { + self.is_closing = true; + } + } else if close_gate { + self.buffer.pop_back().map(|evt| output.push(evt)); + self.is_closing = true; + } else if self.current_state == GateState::Open { + self.buffer.pop_back().map(|evt| output.push(evt)); + } else { + self.events_dropped.emit(Count(1)); + } + + if self.is_closing { + self.events_counter += 1; + + if self.events_counter > self.tail_events { + self.current_state = GateState::Closed; + self.events_counter = 0; + self.is_closing = false; + } + } + } +} + +#[cfg(test)] +mod test { + use tokio::sync::mpsc; + use tokio_stream::wrappers::ReceiverStream; + use vrl::core::Value; + + use crate::{ + event::{Event, LogEvent}, + test_util::components::assert_transform_compliance, + transforms::test::create_topology, + }; + use crate::conditions::{ConditionConfig, VrlConfig}; + use crate::transforms::gate::config::GateConfig; + + use super::*; + + #[tokio::test] + async fn gate_manual_close() { + assert_transform_compliance(async { + let open_config = VrlConfig { + source: String::from(r#".message == "open""#), + runtime: Default::default(), + }; + + let close_config = VrlConfig { + source: String::from(r#".message == "close""#), + runtime: Default::default(), + }; + + let pass_config = VrlConfig { + source: String::from(r#".message == "hello""#), + runtime: Default::default(), + }; + + let gate_config = GateConfig { + auto_close: Some(false), + open_when: Some(AnyCondition::from(ConditionConfig::Vrl(open_config))), + close_when: Some(AnyCondition::from(ConditionConfig::Vrl(close_config))), + pass_when: Some(AnyCondition::from(ConditionConfig::Vrl(pass_config))), + max_events: Some(3), + tail_events: Some(3), + }; + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), gate_config).await; + + let messages = [ + "drop1", "drop2", "pass1", "hello", "pass2", "open", "pass3", "pass4", + "pass5", "close", "pass6", "pass7", "pass8", "drop3", "drop4", "drop5" + ]; + + let events = messages.map(|msg| Event::from(LogEvent::from(msg))); + + for evt in events { + tx.send(evt.clone()).await.unwrap(); + } + + assert_message("hello", out.recv().await).await; + assert_message("pass1", out.recv().await).await; + assert_message("pass2", out.recv().await).await; + assert_message("open", out.recv().await).await; + assert_message("pass3", out.recv().await).await; + assert_message("pass4", out.recv().await).await; + assert_message("pass5", out.recv().await).await; + assert_message("close", out.recv().await).await; + assert_message("pass6", out.recv().await).await; + assert_message("pass7", out.recv().await).await; + assert_message("pass8", out.recv().await).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }).await; + } + + #[tokio::test] + async fn gate_auto_close() { + assert_transform_compliance(async { + let open_config = VrlConfig { + source: String::from(r#".message == "open""#), + runtime: Default::default(), + }; + + let close_config = VrlConfig { + source: String::from(r#".message == "close""#), + runtime: Default::default(), + }; + + let pass_config = VrlConfig { + source: String::from(r#".message == "hello""#), + runtime: Default::default(), + }; + + let gate_config = GateConfig { + auto_close: Some(true), + open_when: Some(AnyCondition::from(ConditionConfig::Vrl(open_config))), + close_when: Some(AnyCondition::from(ConditionConfig::Vrl(close_config))), + pass_when: Some(AnyCondition::from(ConditionConfig::Vrl(pass_config))), + max_events: Some(3), + tail_events: Some(3), + }; + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), gate_config).await; + + let messages = [ + "drop1", "drop2", "pass1", "hello", "pass2", "open", "pass3", "pass4", + "pass5", "drop3", "drop4", "drop5" + ]; + + let events = messages.map(|msg| Event::from(LogEvent::from(msg))); + + for evt in events { + tx.send(evt.clone()).await.unwrap(); + } + + assert_message("hello", out.recv().await).await; + assert_message("pass1", out.recv().await).await; + assert_message("pass2", out.recv().await).await; + assert_message("open", out.recv().await).await; + assert_message("pass3", out.recv().await).await; + assert_message("pass4", out.recv().await).await; + assert_message("pass5", out.recv().await).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }).await; + } + + async fn assert_message(message: &str, event: Option) { + assert_eq!( + &Value::from(message), + event.unwrap().as_log().get("message").unwrap() + ); + } +} diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index e3db831dafa0d..dd101cfcc3ae1 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -14,6 +14,8 @@ pub mod aggregate; pub mod aws_ec2_metadata; #[cfg(feature = "transforms-filter")] pub mod filter; +#[cfg(feature = "transforms-gate")] +pub mod gate; #[cfg(feature = "transforms-log_to_metric")] pub mod log_to_metric; #[cfg(feature = "transforms-lua")]