diff --git a/Cargo.toml b/Cargo.toml index 503132c85f507..5d01e3d2d0414 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -671,6 +671,7 @@ transforms-logs = [ "transforms-aws_ec2_metadata", "transforms-dedupe", "transforms-filter", + "transforms-window", "transforms-log_to_metric", "transforms-lua", "transforms-metric_to_log", @@ -696,6 +697,7 @@ transforms-aggregate = [] transforms-aws_ec2_metadata = ["dep:arc-swap"] transforms-dedupe = ["transforms-impl-dedupe"] transforms-filter = [] +transforms-window = [] transforms-log_to_metric = [] transforms-lua = ["dep:mlua", "vector-lib/lua"] transforms-metric_to_log = [] diff --git a/changelog.d/window-transform.feature.md b/changelog.d/window-transform.feature.md new file mode 100644 index 0000000000000..b3f366108c625 --- /dev/null +++ b/changelog.d/window-transform.feature.md @@ -0,0 +1,5 @@ +Add a new `window` transform, a variant of ring buffer or backtrace logging implemented as a sliding window. +Allows for reduction of log volume by filtering out logs when the system is healthy, but preserving detailed +logs when they are most relevant. + +authors: ilinas diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 47e67943aeb07..52c5cafb7a6e7 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -129,6 +129,8 @@ mod unix; mod websocket; #[cfg(feature = "sinks-websocket-server")] mod websocket_server; +#[cfg(feature = "transforms-window")] +mod window; #[cfg(any( feature = "sources-file", @@ -265,6 +267,8 @@ pub(crate) use self::unix::*; pub(crate) use self::websocket::*; #[cfg(feature = "sinks-websocket-server")] pub(crate) use self::websocket_server::*; +#[cfg(feature = "transforms-window")] +pub(crate) use self::window::*; #[cfg(windows)] pub(crate) use self::windows::*; pub use self::{ diff --git a/src/internal_events/window.rs b/src/internal_events/window.rs new file mode 100644 index 0000000000000..5b2f666edcb11 --- /dev/null +++ b/src/internal_events/window.rs @@ -0,0 +1,14 @@ +use vector_lib::internal_event::{ComponentEventsDropped, Count, Registered, INTENTIONAL}; + +vector_lib::registered_event!( + WindowEventsDropped => { + events_dropped: Registered> + = register!(ComponentEventsDropped::::from( + "The buffer was full" + )), + } + + fn emit(&self, data: Count) { + self.events_dropped.emit(data); + } +); diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index ea3e9aa163afe..0247897ee295a 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -29,6 +29,8 @@ pub mod route; pub mod tag_cardinality_limit; #[cfg(feature = "transforms-throttle")] pub mod throttle; +#[cfg(feature = "transforms-window")] +pub mod window; pub use vector_lib::transform::{ FunctionTransform, OutputBuffer, SyncTransform, TaskTransform, Transform, TransformOutputs, diff --git a/src/transforms/window/config.rs b/src/transforms/window/config.rs new file mode 100644 index 0000000000000..21a2a5d36cafc --- /dev/null +++ b/src/transforms/window/config.rs @@ -0,0 +1,94 @@ +use vector_lib::config::{clone_input_definitions, LogNamespace}; +use vector_lib::configurable::configurable_component; + +use crate::{ + conditions::AnyCondition, + config::{ + DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, + TransformOutput, + }, + schema, + transforms::Transform, +}; + +use super::transform::Window; + +/// Configuration for the `window` transform. +#[configurable_component(transform( + "window", + "Apply a buffered sliding window over the stream of events and flush it based on supplied criteria" +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct WindowConfig { + /// A condition used to pass events through the transform without buffering. + /// + /// If the condition resolves to `true` for an event, the event is immediately forwarded without + /// buffering and without preserving the original order of events. Use with caution if the sink + /// cannot handle out of order events. + pub forward_when: Option, + + /// A condition used to flush the events. + /// + /// If the condition resolves to `true` for an event, the whole window is immediately flushed, + /// including the event itself, and any following events if `num_events_after` is more than zero. + pub flush_when: AnyCondition, + + /// The maximum number of events to keep before the event matched by the `flush_when` condition. + #[serde(default = "default_events_before")] + pub num_events_before: usize, + + /// The maximum number of events to keep after the event matched by the `flush_when` condition. + #[serde(default = "default_events_after")] + pub num_events_after: usize, +} + +impl GenerateConfig for WindowConfig { + fn generate_config() -> toml::Value { + toml::from_str(r#"flush_when = ".message == \"value\"""#).unwrap() + } +} + +const fn default_events_before() -> usize { + 100 +} + +const fn default_events_after() -> usize { + 0 +} + +#[async_trait::async_trait] +#[typetag::serde(name = "window")] +impl TransformConfig for WindowConfig { + async fn build(&self, context: &TransformContext) -> crate::Result { + Ok(Transform::function( + Window::new( + self.forward_when + .as_ref() + .map(|condition| condition.build(&context.enrichment_tables)) + .transpose()?, + self.flush_when.build(&context.enrichment_tables)?, + self.num_events_before, + self.num_events_after, + ) + .unwrap(), + )) + } + + fn input(&self) -> Input { + Input::new(DataType::Log) + } + + fn outputs( + &self, + _: vector_lib::enrichment::TableRegistry, + input_definitions: &[(OutputId, schema::Definition)], + _: LogNamespace, + ) -> Vec { + // The event is not modified, so the definition is passed through as-is + vec![TransformOutput::new( + DataType::Log, + clone_input_definitions(input_definitions), + )] + } +} diff --git a/src/transforms/window/mod.rs b/src/transforms/window/mod.rs new file mode 100644 index 0000000000000..3068477ea9559 --- /dev/null +++ b/src/transforms/window/mod.rs @@ -0,0 +1,2 @@ +pub mod config; +pub mod transform; diff --git a/src/transforms/window/transform.rs b/src/transforms/window/transform.rs new file mode 100644 index 0000000000000..2149fbad61d0b --- /dev/null +++ b/src/transforms/window/transform.rs @@ -0,0 +1,405 @@ +use std::collections::VecDeque; + +use vector_lib::internal_event::{Count, InternalEventHandle as _, Registered}; + +use crate::{ + conditions::Condition, + event::Event, + internal_events::WindowEventsDropped, + transforms::{FunctionTransform, OutputBuffer}, +}; + +#[derive(Clone)] +pub struct Window { + // Configuration parameters + forward_when: Option, + flush_when: Condition, + num_events_before: usize, + num_events_after: usize, + + // Internal variables + buffer: VecDeque, + events_counter: usize, + events_dropped: Registered, + is_flushing: bool, +} + +impl Window { + pub fn new( + forward_when: Option, + flush_when: Condition, + num_events_before: usize, + num_events_after: usize, + ) -> crate::Result { + let buffer = VecDeque::with_capacity(num_events_before); + + Ok(Window { + forward_when, + flush_when, + num_events_before, + num_events_after, + events_dropped: register!(WindowEventsDropped), + buffer, + events_counter: 0, + is_flushing: false, + }) + } +} + +impl FunctionTransform for Window { + fn transform(&mut self, output: &mut OutputBuffer, event: Event) { + let (pass, event) = match self.forward_when.as_ref() { + Some(condition) => { + let (result, event) = condition.check(event); + (result, event) + } + _ => (false, event), + }; + + let (flush, event) = self.flush_when.check(event); + + if self.buffer.capacity() < self.num_events_before { + self.buffer.reserve(self.num_events_before); + } + + if pass { + output.push(event); + } else if flush { + if self.num_events_before > 0 { + self.buffer.drain(..).for_each(|evt| output.push(evt)); + } + + self.events_counter = 0; + self.is_flushing = true; + output.push(event); + } else if self.is_flushing { + self.events_counter += 1; + + if self.events_counter > self.num_events_after { + self.events_counter = 0; + self.is_flushing = false; + self.events_dropped.emit(Count(1)); + } else { + output.push(event); + } + } else if self.buffer.len() >= self.num_events_before { + self.buffer.pop_front(); + self.buffer.push_back(event); + self.events_dropped.emit(Count(1)); + } else if self.num_events_before > 0 { + self.buffer.push_back(event); + } else { + self.events_dropped.emit(Count(1)); + } + } +} + +#[cfg(test)] +mod test { + use std::ops::RangeInclusive; + use tokio::sync::mpsc; + use tokio::sync::mpsc::{Receiver, Sender}; + use tokio_stream::wrappers::ReceiverStream; + use vrl::core::Value; + + use crate::conditions::{AnyCondition, ConditionConfig, VrlConfig}; + use crate::transforms::window::config::WindowConfig; + use crate::{ + event::{Event, LogEvent}, + test_util::components::assert_transform_compliance, + transforms::test::create_topology, + }; + + #[tokio::test] + async fn test_flush() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 1, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_event(&tx, "flush").await; + assert_event("flush", out.recv().await).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_pass() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let forward_when = get_condition("forward"); + let transform_config = get_transform_config(flush_when, Some(forward_when), 1, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_event(&tx, "forward").await; + assert_event("forward", out.recv().await).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_10_in_50() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 50, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=10)).await; + send_event(&tx, "flush").await; + + let mut expected: [&str; 11] = [ + "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", "A09", "A10", "flush", + ]; + + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_50_in_10() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 10, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=50)).await; + send_event(&tx, "flush").await; + + let mut expected: [&str; 11] = [ + "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush", + ]; + + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_before_and_after() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 10, 5); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=50)).await; + send_event(&tx, "flush").await; + send_events(&tx, generate_events(51..=70)).await; + + let mut expected: [&str; 16] = [ + "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush", + "A51", "A52", "A53", "A54", "A55", + ]; + + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_flush_and_pass() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let forward_when = get_condition("forward"); + let transform_config = get_transform_config(flush_when, Some(forward_when), 50, 5); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=5)).await; + send_event(&tx, "forward").await; + send_events(&tx, generate_events(6..=10)).await; + send_event(&tx, "forward").await; + send_event(&tx, "flush").await; + send_event(&tx, "forward").await; + send_events(&tx, generate_events(11..=15)).await; + send_event(&tx, "forward").await; + send_events(&tx, generate_events(16..=20)).await; + + let mut expected: [&str; 20] = [ + "forward", "forward", "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", + "A09", "A10", "flush", "forward", "A11", "A12", "A13", "A14", "A15", "forward", + ]; + + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_zero_before() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 0, 5); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=50)).await; + send_event(&tx, "flush").await; + send_events(&tx, generate_events(51..=70)).await; + + let mut expected: [&str; 6] = ["flush", "A51", "A52", "A53", "A54", "A55"]; + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_zero_flush() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 0, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=50)).await; + send_event(&tx, "flush").await; + send_events(&tx, generate_events(51..=70)).await; + + let mut expected: [&str; 1] = ["flush"]; + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_zero_pass() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let forward_when = get_condition("forward"); + let transform_config = get_transform_config(flush_when, Some(forward_when), 0, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + let events = generate_events(1..=50); + let more_events = generate_events(51..=70); + + send_events(&tx, events).await; + send_event(&tx, "forward").await; + send_event(&tx, "flush").await; + send_events(&tx, more_events).await; + + let mut expected: [&str; 2] = ["forward", "flush"]; + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + const fn get_transform_config( + flush_when: AnyCondition, + forward_when: Option, + num_events_before: usize, + num_events_after: usize, + ) -> WindowConfig { + WindowConfig { + flush_when, + forward_when, + num_events_before, + num_events_after, + } + } + + fn get_condition(message: &str) -> AnyCondition { + AnyCondition::from(ConditionConfig::Vrl(VrlConfig { + source: format!(r#".message == "{message}""#), + runtime: Default::default(), + })) + } + + fn generate_events(range: RangeInclusive) -> Vec { + range + .map(|n| format!("A{n:02}")) + .map(|m| Event::from(LogEvent::from(m))) + .collect::>() + } + + async fn send_events(tx: &Sender, events: Vec) { + for event in events { + tx.send(event).await.unwrap(); + } + } + + async fn send_event(tx: &Sender, message: &str) { + tx.send(Event::from(LogEvent::from(message))).await.unwrap(); + } + + async fn assert_event(message: &str, event: Option) { + assert_eq!( + &Value::from(message), + event.unwrap().as_log().get("message").unwrap() + ); + } + + async fn assert_events(messages: &mut [&str], out: &mut Receiver) { + for message in messages { + assert_event(message, out.recv().await).await; + } + } +} diff --git a/website/content/en/docs/reference/configuration/transforms/window.md b/website/content/en/docs/reference/configuration/transforms/window.md new file mode 100644 index 0000000000000..ade190195844f --- /dev/null +++ b/website/content/en/docs/reference/configuration/transforms/window.md @@ -0,0 +1,14 @@ +--- +title: Window +description: A variant of ring buffer or backtrace logging implemented as a sliding window +component_kind: transform +layout: component +tags: ["filter", "component", "transform", "logs"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... +*/}} diff --git a/website/cue/reference/components.cue b/website/cue/reference/components.cue index ba4879ffad47f..5a350c997a1d3 100644 --- a/website/cue/reference/components.cue +++ b/website/cue/reference/components.cue @@ -204,6 +204,7 @@ components: { exclusive_route?: #FeaturesExclusiveRoute sanitize?: #FeaturesSanitize shape?: #FeaturesShape + window?: #FeaturesWindow } if Args.kind == "sink" { @@ -335,6 +336,8 @@ components: { #FeaturesShape: {} + #FeaturesWindow: {} + #FeaturesSend: { _args: { egress_method: string diff --git a/website/cue/reference/components/transforms/base/window.cue b/website/cue/reference/components/transforms/base/window.cue new file mode 100644 index 0000000000000..6eeb587c9ece0 --- /dev/null +++ b/website/cue/reference/components/transforms/base/window.cue @@ -0,0 +1,35 @@ +package metadata + +base: components: transforms: window: configuration: { + flush_when: { + description: """ + A condition used to flush the events. + + If the condition resolves to `true` for an event, the whole window is immediately flushed, + including the event itself, and any following events if `num_events_after` is more than zero. + """ + required: true + type: condition: {} + } + forward_when: { + description: """ + A condition used to pass events through the transform without buffering. + + If the condition resolves to `true` for an event, the event is immediately forwarded without + buffering and without preserving the original order of events. Use with caution if the sink + cannot handle out of order events. + """ + required: false + type: condition: {} + } + num_events_after: { + description: "The maximum number of events to keep after the event matched by the `flush_when` condition." + required: false + type: uint: default: 0 + } + num_events_before: { + description: "The maximum number of events to keep before the event matched by the `flush_when` condition." + required: false + type: uint: default: 100 + } +} diff --git a/website/cue/reference/components/transforms/window.cue b/website/cue/reference/components/transforms/window.cue new file mode 100644 index 0000000000000..4198d7b1afde5 --- /dev/null +++ b/website/cue/reference/components/transforms/window.cue @@ -0,0 +1,144 @@ +package metadata + +components: transforms: window: { + title: "Window" + + description: """ + A variant of ring buffer or backtrace logging implemented as a sliding window. Keeps events in a buffer until + the `flush_when` condition is matched. When the buffer is full, the oldest events are dropped. + """ + + classes: { + commonly_used: false + development: "beta" + egress_method: "stream" + stateful: true + } + + features: { + filter: {} + } + + support: { + requirements: [] + warnings: [] + notices: [] + } + + configuration: base.components.transforms.window.configuration + + input: { + logs: true + metrics: null + traces: false + } + + examples: [ + { + title: "Flush recent events when an error happens" + input: [ + {log: {message: "A01", level: "info"}}, + {log: {message: "A02", level: "debug"}}, + {log: {message: "A03", level: "info"}}, + {log: {message: "A04", level: "debug"}}, + {log: {message: "A05", level: "error"}}, + {log: {message: "A06", level: "debug"}}, + {log: {message: "A07", level: "warning"}}, + {log: {message: "A08", level: "info"}}, + {log: {message: "A09", level: "debug"}}, + {log: {message: "A10", level: "info"}}, + ] + + configuration: { + flush_when: #".level == "error""# + num_events_before: 2 + num_events_after: 2 + } + + output: [ + {log: {message: "A03", level: "info"}}, + {log: {message: "A04", level: "debug"}}, + {log: {message: "A05", level: "error"}}, + {log: {message: "A06", level: "debug"}}, + {log: {message: "A07", level: "warning"}}, + ] + }, + + { + title: "Pass events through without preserving the order" + input: [ + {log: {message: "A01", level: "info"}}, + {log: {message: "A02", level: "debug"}}, + {log: {message: "A03", level: "info"}}, + {log: {message: "A04", level: "debug"}}, + {log: {message: "A05", level: "error"}}, + {log: {message: "A06", level: "debug"}}, + {log: {message: "A07", level: "warning"}}, + {log: {message: "A08", level: "info"}}, + {log: {message: "A09", level: "debug"}}, + {log: {message: "A10", level: "info"}}, + ] + + configuration: { + flush_when: #".level == "error""# + forward_when: #".level == "info""# + num_events_before: 2 + num_events_after: 2 + } + + output: [ + {log: {message: "A01", level: "info"}}, + {log: {message: "A03", level: "info"}}, + {log: {message: "A02", level: "debug"}}, + {log: {message: "A04", level: "debug"}}, + {log: {message: "A05", level: "error"}}, + {log: {message: "A06", level: "debug"}}, + {log: {message: "A07", level: "warning"}}, + {log: {message: "A08", level: "info"}}, + {log: {message: "A10", level: "info"}}, + ] + }, + ] + + how_it_works: { + advantages: { + title: "Advantages of Use" + body: """ + A common way to reduce log volume from a verbose system is to filter out less important messages, and only + ingest errors and warnings. However, an error message by itself may not be sufficient to determine the + cause, as surrounding events often contain important context information leading to the failure. + + The `window` transform allows for reduction of log volume by filtering out logs + when the system is healthy, but preserving detailed logs when they are most relevant. + """ + } + + sliding_window: { + title: "Sliding Window" + body: """ + As the stream of events passes through the transform, it is observed though a "window" that spans between + `num_events_before` and `num_events_after` relative to an event matched by the `flush_when` condition. When the + condition is matched, the whole window is flushed to the output. This is also known as backtrace logging or + ring buffer logging. + + {{< svg "img/sliding-window.svg" >}} + + Past events are stored in a memory buffer with the capacity of `num_events_before`. When the buffer is full, + the oldest events are dropped to make space for new ones. The buffer is not persistent, so in case of a hard + system crash, all the buffered events will be lost. + + Future events are counted from the event matched by the `flush_when` condition until `num_events_after` number + of events is reached. + + If the `flush_when` condition is matched before the buffer fills up, it will be flushed again. If the flush + condition is triggered often enough (for example, the system is constantly logging errors), the transform may + always be in the flushing state, meaning that no events will be filtered. Therefore it works best for conditions + that are relatively uncommon. + """ + } + } + + telemetry: metrics: { + stale_events_flushed_total: components.sources.internal_metrics.output.metrics.stale_events_flushed_total + } +} diff --git a/website/static/img/sliding-window.svg b/website/static/img/sliding-window.svg new file mode 100644 index 0000000000000..165a060e2048b --- /dev/null +++ b/website/static/img/sliding-window.svg @@ -0,0 +1,418 @@ + + + + + + + + + + A + + + + B + + + + C + + + + D + + + + E + + + + F + + + + G + + + + H + + + + I + + + + J + + + + K + + + + L + + + + M + + + + + + Event of interest + Events before + + + + + + + + + Events after + +