From 1f3ca06a6d2e9766db1d0f82598bb6b58ac2107b Mon Sep 17 00:00:00 2001 From: Linas Zvirblis Date: Thu, 13 Feb 2025 17:39:28 +0100 Subject: [PATCH 1/8] feat(new transform): Add window transform --- Cargo.toml | 2 + changelog.d/window-transform.feature.md | 3 + src/internal_events/mod.rs | 4 + src/internal_events/window.rs | 14 + src/transforms/mod.rs | 2 + src/transforms/window/config.rs | 74 ++++ src/transforms/window/mod.rs | 2 + src/transforms/window/transform.rs | 405 +++++++++++++++++ .../configuration/transforms/window.md | 14 + website/cue/reference/components.cue | 3 + .../components/transforms/base/window.cue | 43 ++ .../components/transforms/window.cue | 143 ++++++ website/static/img/sliding-window.svg | 418 ++++++++++++++++++ 13 files changed, 1127 insertions(+) create mode 100644 changelog.d/window-transform.feature.md create mode 100644 src/internal_events/window.rs create mode 100644 src/transforms/window/config.rs create mode 100644 src/transforms/window/mod.rs create mode 100644 src/transforms/window/transform.rs create mode 100644 website/content/en/docs/reference/configuration/transforms/window.md create mode 100644 website/cue/reference/components/transforms/base/window.cue create mode 100644 website/cue/reference/components/transforms/window.cue create mode 100644 website/static/img/sliding-window.svg diff --git a/Cargo.toml b/Cargo.toml index c1428a70bd5b6..3dc6bf9489e5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -660,6 +660,7 @@ transforms-logs = [ "transforms-aws_ec2_metadata", "transforms-dedupe", "transforms-filter", + "transforms-window", "transforms-log_to_metric", "transforms-lua", "transforms-metric_to_log", @@ -685,6 +686,7 @@ transforms-aggregate = [] transforms-aws_ec2_metadata = ["dep:arc-swap"] transforms-dedupe = ["transforms-impl-dedupe"] transforms-filter = [] +transforms-window = [] transforms-log_to_metric = [] transforms-lua = ["dep:mlua", "vector-lib/lua"] transforms-metric_to_log = [] diff --git a/changelog.d/window-transform.feature.md b/changelog.d/window-transform.feature.md new file mode 100644 index 0000000000000..fd532d0138215 --- /dev/null +++ b/changelog.d/window-transform.feature.md @@ -0,0 +1,3 @@ +Add new `window` transform. + +authors: ilinas diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 12a323214ec6d..b6b16941d858d 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -129,6 +129,8 @@ mod unix; mod websocket; #[cfg(feature = "sinks-websocket-server")] mod websocket_server; +#[cfg(feature = "transforms-window")] +mod window; #[cfg(any( feature = "sources-file", @@ -265,6 +267,8 @@ pub(crate) use self::unix::*; pub(crate) use self::websocket::*; #[cfg(feature = "sinks-websocket-server")] pub(crate) use self::websocket_server::*; +#[cfg(feature = "transforms-window")] +pub(crate) use self::window::*; #[cfg(windows)] pub(crate) use self::windows::*; pub use self::{ diff --git a/src/internal_events/window.rs b/src/internal_events/window.rs new file mode 100644 index 0000000000000..5b2f666edcb11 --- /dev/null +++ b/src/internal_events/window.rs @@ -0,0 +1,14 @@ +use vector_lib::internal_event::{ComponentEventsDropped, Count, Registered, INTENTIONAL}; + +vector_lib::registered_event!( + WindowEventsDropped => { + events_dropped: Registered> + = register!(ComponentEventsDropped::::from( + "The buffer was full" + )), + } + + fn emit(&self, data: Count) { + self.events_dropped.emit(data); + } +); diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index ea3e9aa163afe..0247897ee295a 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -29,6 +29,8 @@ pub mod route; pub mod tag_cardinality_limit; #[cfg(feature = "transforms-throttle")] pub mod throttle; +#[cfg(feature = "transforms-window")] +pub mod window; pub use vector_lib::transform::{ FunctionTransform, OutputBuffer, SyncTransform, TaskTransform, Transform, TransformOutputs, diff --git a/src/transforms/window/config.rs b/src/transforms/window/config.rs new file mode 100644 index 0000000000000..24bada59950c3 --- /dev/null +++ b/src/transforms/window/config.rs @@ -0,0 +1,74 @@ +use vector_lib::config::{clone_input_definitions, LogNamespace}; +use vector_lib::configurable::configurable_component; + +use crate::{ + conditions::AnyCondition, + config::{ + DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, + TransformOutput, + }, + schema, + transforms::Transform, +}; + +use super::transform::Window; + +/// Configuration for the `window` transform. +#[configurable_component(transform( + "window", + "Apply a buffered sliding window over the stream of events and flush it based on supplied criteria" +))] +#[derive(Clone, Debug)] +#[serde(deny_unknown_fields)] +pub struct WindowConfig { + /// A condition used to pass events through the transform without buffering + pub pass_when: Option, + /// A condition used to flush the events + pub flush_when: AnyCondition, + /// The maximum number of events to keep before the event matched by the flush_when condition + pub events_before: Option, + /// The maximum number of events to keep after the event matched by the flush_when condition + pub events_after: Option, +} + +impl GenerateConfig for WindowConfig { + fn generate_config() -> toml::Value { + toml::from_str(r#"flush_when = ".message == \"value\"""#).unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "window")] +impl TransformConfig for WindowConfig { + async fn build(&self, context: &TransformContext) -> crate::Result { + Ok(Transform::function( + Window::new( + self.pass_when + .as_ref() + .map(|condition| condition.build(&context.enrichment_tables)) + .transpose()?, + self.flush_when.build(&context.enrichment_tables)?, + self.events_before.unwrap_or(100), + self.events_after.unwrap_or(0), + ) + .unwrap(), + )) + } + + fn input(&self) -> Input { + Input::new(DataType::Log) + } + + fn outputs( + &self, + _: vector_lib::enrichment::TableRegistry, + input_definitions: &[(OutputId, schema::Definition)], + _: LogNamespace, + ) -> Vec { + // The event is not modified, so the definition is passed through as-is + vec![TransformOutput::new( + DataType::Log, + clone_input_definitions(input_definitions), + )] + } +} diff --git a/src/transforms/window/mod.rs b/src/transforms/window/mod.rs new file mode 100644 index 0000000000000..3068477ea9559 --- /dev/null +++ b/src/transforms/window/mod.rs @@ -0,0 +1,2 @@ +pub mod config; +pub mod transform; diff --git a/src/transforms/window/transform.rs b/src/transforms/window/transform.rs new file mode 100644 index 0000000000000..c221097090523 --- /dev/null +++ b/src/transforms/window/transform.rs @@ -0,0 +1,405 @@ +use std::collections::VecDeque; + +use vector_lib::internal_event::{Count, InternalEventHandle as _, Registered}; + +use crate::{ + conditions::Condition, + event::Event, + internal_events::WindowEventsDropped, + transforms::{FunctionTransform, OutputBuffer}, +}; + +#[derive(Clone)] +pub struct Window { + // Configuration parameters + pass_when: Option, + flush_when: Condition, + events_before: usize, + events_after: usize, + + // Internal variables + buffer: VecDeque, + events_counter: usize, + events_dropped: Registered, + is_flushing: bool, +} + +impl Window { + pub fn new( + pass_when: Option, + flush_when: Condition, + events_before: usize, + events_after: usize, + ) -> crate::Result { + let buffer = VecDeque::with_capacity(events_before); + + Ok(Window { + pass_when, + flush_when, + events_before, + events_after, + events_dropped: register!(WindowEventsDropped), + buffer, + events_counter: 0, + is_flushing: false, + }) + } +} + +impl FunctionTransform for Window { + fn transform(&mut self, output: &mut OutputBuffer, event: Event) { + let (pass, event) = match self.pass_when.as_ref() { + Some(condition) => { + let (result, event) = condition.check(event); + (result, event) + } + _ => (false, event), + }; + + let (flush, event) = self.flush_when.check(event); + + if self.buffer.capacity() < self.events_before { + self.buffer.reserve(self.events_before); + } + + if pass { + output.push(event); + } else if flush { + if self.events_before > 0 { + self.buffer.drain(..).for_each(|evt| output.push(evt)); + } + + self.events_counter = 0; + self.is_flushing = true; + output.push(event); + } else if self.is_flushing { + self.events_counter += 1; + + if self.events_counter > self.events_after { + self.events_counter = 0; + self.is_flushing = false; + self.events_dropped.emit(Count(1)); + } else { + output.push(event); + } + } else if self.buffer.len() >= self.events_before { + self.buffer.pop_front(); + self.buffer.push_back(event); + self.events_dropped.emit(Count(1)); + } else if self.events_before > 0 { + self.buffer.push_back(event); + } else { + self.events_dropped.emit(Count(1)); + } + } +} + +#[cfg(test)] +mod test { + use std::ops::RangeInclusive; + use tokio::sync::mpsc; + use tokio::sync::mpsc::{Receiver, Sender}; + use tokio_stream::wrappers::ReceiverStream; + use vrl::core::Value; + + use crate::conditions::{AnyCondition, ConditionConfig, VrlConfig}; + use crate::transforms::window::config::WindowConfig; + use crate::{ + event::{Event, LogEvent}, + test_util::components::assert_transform_compliance, + transforms::test::create_topology, + }; + + #[tokio::test] + async fn test_flush() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 1, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_event(&tx, "flush").await; + assert_event("flush", out.recv().await).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_pass() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let pass_when = get_condition("pass"); + let transform_config = get_transform_config(flush_when, Some(pass_when), 1, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_event(&tx, "pass").await; + assert_event("pass", out.recv().await).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_10_in_50() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 50, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=10)).await; + send_event(&tx, "flush").await; + + let mut expected: [&str; 11] = [ + "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", "A09", "A10", "flush", + ]; + + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_50_in_10() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 10, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=50)).await; + send_event(&tx, "flush").await; + + let mut expected: [&str; 11] = [ + "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush", + ]; + + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_before_and_after() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 10, 5); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=50)).await; + send_event(&tx, "flush").await; + send_events(&tx, generate_events(51..=70)).await; + + let mut expected: [&str; 16] = [ + "A41", "A42", "A43", "A44", "A45", "A46", "A47", "A48", "A49", "A50", "flush", + "A51", "A52", "A53", "A54", "A55", + ]; + + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_flush_and_pass() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let pass_when = get_condition("pass"); + let transform_config = get_transform_config(flush_when, Some(pass_when), 50, 5); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=5)).await; + send_event(&tx, "pass").await; + send_events(&tx, generate_events(6..=10)).await; + send_event(&tx, "pass").await; + send_event(&tx, "flush").await; + send_event(&tx, "pass").await; + send_events(&tx, generate_events(11..=15)).await; + send_event(&tx, "pass").await; + send_events(&tx, generate_events(16..=20)).await; + + let mut expected: [&str; 20] = [ + "pass", "pass", "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", "A09", + "A10", "flush", "pass", "A11", "A12", "A13", "A14", "A15", "pass", + ]; + + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_zero_before() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 0, 5); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=50)).await; + send_event(&tx, "flush").await; + send_events(&tx, generate_events(51..=70)).await; + + let mut expected: [&str; 6] = ["flush", "A51", "A52", "A53", "A54", "A55"]; + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_zero_flush() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let transform_config = get_transform_config(flush_when, None, 0, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + send_events(&tx, generate_events(1..=50)).await; + send_event(&tx, "flush").await; + send_events(&tx, generate_events(51..=70)).await; + + let mut expected: [&str; 1] = ["flush"]; + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn test_zero_pass() { + assert_transform_compliance(async { + let flush_when = get_condition("flush"); + let pass_when = get_condition("pass"); + let transform_config = get_transform_config(flush_when, Some(pass_when), 0, 0); + + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = + create_topology(ReceiverStream::new(rx), transform_config).await; + + let events = generate_events(1..=50); + let more_events = generate_events(51..=70); + + send_events(&tx, events).await; + send_event(&tx, "pass").await; + send_event(&tx, "flush").await; + send_events(&tx, more_events).await; + + let mut expected: [&str; 2] = ["pass", "flush"]; + assert_events(&mut expected, &mut out).await; + + drop(tx); + topology.stop().await; + + assert_eq!(out.recv().await, None); + }) + .await; + } + + const fn get_transform_config( + flush_when: AnyCondition, + pass_when: Option, + events_before: usize, + events_after: usize, + ) -> WindowConfig { + WindowConfig { + flush_when, + pass_when, + events_before: Some(events_before), + events_after: Some(events_after), + } + } + + fn get_condition(message: &str) -> AnyCondition { + AnyCondition::from(ConditionConfig::Vrl(VrlConfig { + source: format!(r#".message == "{message}""#), + runtime: Default::default(), + })) + } + + fn generate_events(range: RangeInclusive) -> Vec { + range + .map(|n| format!("A{n:02}")) + .map(|m| Event::from(LogEvent::from(m))) + .collect::>() + } + + async fn send_events(tx: &Sender, events: Vec) { + for event in events { + tx.send(event).await.unwrap(); + } + } + + async fn send_event(tx: &Sender, message: &str) { + tx.send(Event::from(LogEvent::from(message))).await.unwrap(); + } + + async fn assert_event(message: &str, event: Option) { + assert_eq!( + &Value::from(message), + event.unwrap().as_log().get("message").unwrap() + ); + } + + async fn assert_events(messages: &mut [&str], out: &mut Receiver) { + for message in messages { + assert_event(message, out.recv().await).await; + } + } +} diff --git a/website/content/en/docs/reference/configuration/transforms/window.md b/website/content/en/docs/reference/configuration/transforms/window.md new file mode 100644 index 0000000000000..7ab624bf0863e --- /dev/null +++ b/website/content/en/docs/reference/configuration/transforms/window.md @@ -0,0 +1,14 @@ +--- +title: Window +description: When a condition is met, flush recent events to the output +component_kind: transform +layout: component +tags: ["filter", "component", "transform", "logs"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... +*/}} diff --git a/website/cue/reference/components.cue b/website/cue/reference/components.cue index ba4879ffad47f..5a350c997a1d3 100644 --- a/website/cue/reference/components.cue +++ b/website/cue/reference/components.cue @@ -204,6 +204,7 @@ components: { exclusive_route?: #FeaturesExclusiveRoute sanitize?: #FeaturesSanitize shape?: #FeaturesShape + window?: #FeaturesWindow } if Args.kind == "sink" { @@ -335,6 +336,8 @@ components: { #FeaturesShape: {} + #FeaturesWindow: {} + #FeaturesSend: { _args: { egress_method: string diff --git a/website/cue/reference/components/transforms/base/window.cue b/website/cue/reference/components/transforms/base/window.cue new file mode 100644 index 0000000000000..b52f0816f287f --- /dev/null +++ b/website/cue/reference/components/transforms/base/window.cue @@ -0,0 +1,43 @@ +package metadata + +base: components: transforms: window: configuration: { + flush_when: { + description: """ + A condition used to flush the events. + + If the condition resolves to `true` for an event, the whole window is immediately flushed, + including the event itself, and any following events it `events_after` is more than zero. + """ + required: true + type: condition: {} + } + pass_when: { + description: """ + A condition used to pass events through the transform without buffering. + + If the condition resolves to `true` for an event, the event is immediatelly passed through + without preserving the original order of events. Use with caution if the sink cannot handle + out of order events. + """ + required: false + type: condition: {} + } + events_before: { + description: """ + The maximum number of events to keep before the event matched by the `flush_when` condition. + """ + required: false + type: uint: { + default: 100 + } + } + events_after: { + description: """ + The maximum number of events to keep after the event matched by the `flush_when` condition. + """ + required: false + type: uint: { + default: 0 + } + } +} diff --git a/website/cue/reference/components/transforms/window.cue b/website/cue/reference/components/transforms/window.cue new file mode 100644 index 0000000000000..992dbbc944c14 --- /dev/null +++ b/website/cue/reference/components/transforms/window.cue @@ -0,0 +1,143 @@ +package metadata + +components: transforms: window: { + title: "Window" + + description: """ + When a condition is met, flush recent events to the output. Otherwise silently drop non-matching events. + """ + + classes: { + commonly_used: false + development: "beta" + egress_method: "stream" + stateful: true + } + + features: { + filter: {} + } + + support: { + requirements: [] + warnings: [] + notices: [] + } + + configuration: base.components.transforms.window.configuration + + input: { + logs: true + metrics: null + traces: false + } + + examples: [ + { + title: "Flush recent events when an error happens" + input: [ + { log: { message: "A01", level: "info" } }, + { log: { message: "A02", level: "debug" } }, + { log: { message: "A03", level: "info" } }, + { log: { message: "A04", level: "debug" } }, + { log: { message: "A05", level: "error" } }, + { log: { message: "A06", level: "debug" } }, + { log: { message: "A07", level: "warning" } }, + { log: { message: "A08", level: "info" } }, + { log: { message: "A09", level: "debug" } }, + { log: { message: "A10", level: "info" } }, + ] + + configuration: { + flush_when: #".level == "error""# + events_before: 2 + events_after: 2 + } + + output: [ + { log: { message: "A03", level: "info" } }, + { log: { message: "A04", level: "debug" } }, + { log: { message: "A05", level: "error" } }, + { log: { message: "A06", level: "debug" } }, + { log: { message: "A07", level: "warning" } }, + ] + }, + + { + title: "Pass events through without preserving the order" + input: [ + { log: { message: "A01", level: "info" } }, + { log: { message: "A02", level: "debug" } }, + { log: { message: "A03", level: "info" } }, + { log: { message: "A04", level: "debug" } }, + { log: { message: "A05", level: "error" } }, + { log: { message: "A06", level: "debug" } }, + { log: { message: "A07", level: "warning" } }, + { log: { message: "A08", level: "info" } }, + { log: { message: "A09", level: "debug" } }, + { log: { message: "A10", level: "info" } }, + ] + + configuration: { + flush_when: #".level == "error""# + pass_when: #".level == "info""# + events_before: 2 + events_after: 2 + } + + output: [ + { log: { message: "A01", level: "info" } }, + { log: { message: "A03", level: "info" } }, + { log: { message: "A02", level: "debug" } }, + { log: { message: "A04", level: "debug" } }, + { log: { message: "A05", level: "error" } }, + { log: { message: "A06", level: "debug" } }, + { log: { message: "A07", level: "warning" } }, + { log: { message: "A08", level: "info" } }, + { log: { message: "A10", level: "info" } }, + ] + }, + ] + +how_it_works: { + advantages: { + title: "Advantages of Use" + body: """ + A common way to reduce log volume from a verbose system is to filter out less important messages, and only + ingest e.g. errors and warnings. However an error message by itself may not be sufficient to determine the + cause, as surrounding events often contain important context information leading to the failure. + + The `window` transform offers an approach that allows for reduction of log volume by filtering out logs + when the system is healthy, but preserving detailed logs when they are most relevant. + """ + } + + sliding_window: { + title: "Sliding Window" + body: """ + As the stream of events passes through the transform, it is observed though a "window" that spans between + `events_before` and `events_after` relative to an event matched by the `flush_when` condition. When the + condition is matched, the whole window is flushed to the output. This is also known as backtrace logging or + ring buffer logging. + + {{< svg "img/sliding-window.svg" >}} + + Past events are stored in a memory buffer with the capacity of `events_before`. The buffer is not persistent, + so in case of a hard system crash, all the buffered events will be lost. + + Future events are counted from the event matched by the `flush_when` condition until `events_after` number + of events is reached. Otherwise the transform functions as a `filter` transform, silently dropping + non-matching events. + + If the `flush_when` condition is matched before the buffer fills up, it will be flushed again. If the flush + condtion is triggered often enough, e.g. the system is constantly logging errors, the transform may always + be in the flushing state, meaning that no events will be filtered. Therefore it works best for conditions + that are relatively uncommon. + """ + } + } + + telemetry: metrics: { + stale_events_flushed_total: components.sources.internal_metrics.output.metrics.stale_events_flushed_total + } +} diff --git a/website/static/img/sliding-window.svg b/website/static/img/sliding-window.svg new file mode 100644 index 0000000000000..165a060e2048b --- /dev/null +++ b/website/static/img/sliding-window.svg @@ -0,0 +1,418 @@ + + + + + + + + + + A + + + + B + + + + C + + + + D + + + + E + + + + F + + + + G + + + + H + + + + I + + + + J + + + + K + + + + L + + + + M + + + + + + Event of interest + Events before + + + + + + + + + Events after + + From 94292b82ac2a0bf0fbba87892dccb082a2a57a58 Mon Sep 17 00:00:00 2001 From: Linas Zvirblis Date: Fri, 7 Mar 2025 13:52:08 +0100 Subject: [PATCH 2/8] Fix spelling errors --- website/cue/reference/components/transforms/base/window.cue | 2 +- website/cue/reference/components/transforms/window.cue | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/website/cue/reference/components/transforms/base/window.cue b/website/cue/reference/components/transforms/base/window.cue index b52f0816f287f..67eaf05c0a050 100644 --- a/website/cue/reference/components/transforms/base/window.cue +++ b/website/cue/reference/components/transforms/base/window.cue @@ -15,7 +15,7 @@ base: components: transforms: window: configuration: { description: """ A condition used to pass events through the transform without buffering. - If the condition resolves to `true` for an event, the event is immediatelly passed through + If the condition resolves to `true` for an event, the event is immediately passed through without preserving the original order of events. Use with caution if the sink cannot handle out of order events. """ diff --git a/website/cue/reference/components/transforms/window.cue b/website/cue/reference/components/transforms/window.cue index 992dbbc944c14..ce8000c452b6d 100644 --- a/website/cue/reference/components/transforms/window.cue +++ b/website/cue/reference/components/transforms/window.cue @@ -130,7 +130,7 @@ how_it_works: { non-matching events. If the `flush_when` condition is matched before the buffer fills up, it will be flushed again. If the flush - condtion is triggered often enough, e.g. the system is constantly logging errors, the transform may always + condition is triggered often enough, e.g. the system is constantly logging errors, the transform may always be in the flushing state, meaning that no events will be filtered. Therefore it works best for conditions that are relatively uncommon. """ From 798a187783a9e06d437f63e29a7841492b4b738f Mon Sep 17 00:00:00 2001 From: Linas Zvirblis Date: Fri, 7 Mar 2025 17:46:20 +0100 Subject: [PATCH 3/8] Reformat cue docs --- .../components/transforms/window.cue | 82 +++++++++---------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/website/cue/reference/components/transforms/window.cue b/website/cue/reference/components/transforms/window.cue index ce8000c452b6d..c7f975ba80305 100644 --- a/website/cue/reference/components/transforms/window.cue +++ b/website/cue/reference/components/transforms/window.cue @@ -36,71 +36,71 @@ components: transforms: window: { { title: "Flush recent events when an error happens" input: [ - { log: { message: "A01", level: "info" } }, - { log: { message: "A02", level: "debug" } }, - { log: { message: "A03", level: "info" } }, - { log: { message: "A04", level: "debug" } }, - { log: { message: "A05", level: "error" } }, - { log: { message: "A06", level: "debug" } }, - { log: { message: "A07", level: "warning" } }, - { log: { message: "A08", level: "info" } }, - { log: { message: "A09", level: "debug" } }, - { log: { message: "A10", level: "info" } }, + {log: {message: "A01", level: "info"}}, + {log: {message: "A02", level: "debug"}}, + {log: {message: "A03", level: "info"}}, + {log: {message: "A04", level: "debug"}}, + {log: {message: "A05", level: "error"}}, + {log: {message: "A06", level: "debug"}}, + {log: {message: "A07", level: "warning"}}, + {log: {message: "A08", level: "info"}}, + {log: {message: "A09", level: "debug"}}, + {log: {message: "A10", level: "info"}}, ] configuration: { - flush_when: #".level == "error""# + flush_when: #".level == "error""# events_before: 2 - events_after: 2 + events_after: 2 } output: [ - { log: { message: "A03", level: "info" } }, - { log: { message: "A04", level: "debug" } }, - { log: { message: "A05", level: "error" } }, - { log: { message: "A06", level: "debug" } }, - { log: { message: "A07", level: "warning" } }, + {log: {message: "A03", level: "info"}}, + {log: {message: "A04", level: "debug"}}, + {log: {message: "A05", level: "error"}}, + {log: {message: "A06", level: "debug"}}, + {log: {message: "A07", level: "warning"}}, ] }, { title: "Pass events through without preserving the order" input: [ - { log: { message: "A01", level: "info" } }, - { log: { message: "A02", level: "debug" } }, - { log: { message: "A03", level: "info" } }, - { log: { message: "A04", level: "debug" } }, - { log: { message: "A05", level: "error" } }, - { log: { message: "A06", level: "debug" } }, - { log: { message: "A07", level: "warning" } }, - { log: { message: "A08", level: "info" } }, - { log: { message: "A09", level: "debug" } }, - { log: { message: "A10", level: "info" } }, + {log: {message: "A01", level: "info"}}, + {log: {message: "A02", level: "debug"}}, + {log: {message: "A03", level: "info"}}, + {log: {message: "A04", level: "debug"}}, + {log: {message: "A05", level: "error"}}, + {log: {message: "A06", level: "debug"}}, + {log: {message: "A07", level: "warning"}}, + {log: {message: "A08", level: "info"}}, + {log: {message: "A09", level: "debug"}}, + {log: {message: "A10", level: "info"}}, ] configuration: { - flush_when: #".level == "error""# - pass_when: #".level == "info""# + flush_when: #".level == "error""# + pass_when: #".level == "info""# events_before: 2 - events_after: 2 + events_after: 2 } output: [ - { log: { message: "A01", level: "info" } }, - { log: { message: "A03", level: "info" } }, - { log: { message: "A02", level: "debug" } }, - { log: { message: "A04", level: "debug" } }, - { log: { message: "A05", level: "error" } }, - { log: { message: "A06", level: "debug" } }, - { log: { message: "A07", level: "warning" } }, - { log: { message: "A08", level: "info" } }, - { log: { message: "A10", level: "info" } }, + {log: {message: "A01", level: "info"}}, + {log: {message: "A03", level: "info"}}, + {log: {message: "A02", level: "debug"}}, + {log: {message: "A04", level: "debug"}}, + {log: {message: "A05", level: "error"}}, + {log: {message: "A06", level: "debug"}}, + {log: {message: "A07", level: "warning"}}, + {log: {message: "A08", level: "info"}}, + {log: {message: "A10", level: "info"}}, ] }, ] -how_it_works: { - advantages: { + how_it_works: { + advantages: { title: "Advantages of Use" body: """ A common way to reduce log volume from a verbose system is to filter out less important messages, and only From 0affdcea10a8fbc2cf28e3c1141d7974814bc484 Mon Sep 17 00:00:00 2001 From: Linas Zvirblis Date: Wed, 12 Mar 2025 14:55:37 +0100 Subject: [PATCH 4/8] Apply documentation changes from code review Co-authored-by: Joe Peeples --- website/cue/reference/components/transforms/base/window.cue | 2 +- website/cue/reference/components/transforms/window.cue | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/website/cue/reference/components/transforms/base/window.cue b/website/cue/reference/components/transforms/base/window.cue index 67eaf05c0a050..78f533d9aae6b 100644 --- a/website/cue/reference/components/transforms/base/window.cue +++ b/website/cue/reference/components/transforms/base/window.cue @@ -6,7 +6,7 @@ base: components: transforms: window: configuration: { A condition used to flush the events. If the condition resolves to `true` for an event, the whole window is immediately flushed, - including the event itself, and any following events it `events_after` is more than zero. + including the event itself, and any following events if `events_after` is more than zero. """ required: true type: condition: {} diff --git a/website/cue/reference/components/transforms/window.cue b/website/cue/reference/components/transforms/window.cue index c7f975ba80305..58adcfe0caee3 100644 --- a/website/cue/reference/components/transforms/window.cue +++ b/website/cue/reference/components/transforms/window.cue @@ -104,10 +104,10 @@ components: transforms: window: { title: "Advantages of Use" body: """ A common way to reduce log volume from a verbose system is to filter out less important messages, and only - ingest e.g. errors and warnings. However an error message by itself may not be sufficient to determine the + ingest errors and warnings. However, an error message by itself may not be sufficient to determine the cause, as surrounding events often contain important context information leading to the failure. - The `window` transform offers an approach that allows for reduction of log volume by filtering out logs + The `window` transform allows for reduction of log volume by filtering out logs when the system is healthy, but preserving detailed logs when they are most relevant. """ } @@ -130,7 +130,7 @@ components: transforms: window: { non-matching events. If the `flush_when` condition is matched before the buffer fills up, it will be flushed again. If the flush - condition is triggered often enough, e.g. the system is constantly logging errors, the transform may always + condition is triggered often enough (for example, the system is constantly logging errors), the transform may always be in the flushing state, meaning that no events will be filtered. Therefore it works best for conditions that are relatively uncommon. """ From 3eb5a46d29fe89a88ebe307b679b0a63c375534f Mon Sep 17 00:00:00 2001 From: Linas Zvirblis Date: Thu, 13 Mar 2025 10:26:27 +0100 Subject: [PATCH 5/8] Generate component docs --- src/transforms/window/config.rs | 36 ++++++++++++++----- src/transforms/window/transform.rs | 4 +-- .../components/transforms/base/window.cue | 28 ++++++--------- 3 files changed, 40 insertions(+), 28 deletions(-) diff --git a/src/transforms/window/config.rs b/src/transforms/window/config.rs index 24bada59950c3..9c13b40aa71ea 100644 --- a/src/transforms/window/config.rs +++ b/src/transforms/window/config.rs @@ -21,14 +21,26 @@ use super::transform::Window; #[derive(Clone, Debug)] #[serde(deny_unknown_fields)] pub struct WindowConfig { - /// A condition used to pass events through the transform without buffering + /// A condition used to pass events through the transform without buffering. + /// + /// If the condition resolves to `true` for an event, the event is immediately passed through + /// without preserving the original order of events. Use with caution if the sink cannot handle + /// out of order events. pub pass_when: Option, - /// A condition used to flush the events + + /// A condition used to flush the events. + /// + /// If the condition resolves to `true` for an event, the whole window is immediately flushed, + /// including the event itself, and any following events if `events_after` is more than zero. pub flush_when: AnyCondition, - /// The maximum number of events to keep before the event matched by the flush_when condition - pub events_before: Option, - /// The maximum number of events to keep after the event matched by the flush_when condition - pub events_after: Option, + + /// The maximum number of events to keep before the event matched by the `flush_when` condition. + #[serde(default = "default_events_before")] + pub events_before: usize, + + /// The maximum number of events to keep after the event matched by the `flush_when` condition. + #[serde(default = "default_events_after")] + pub events_after: usize, } impl GenerateConfig for WindowConfig { @@ -37,6 +49,14 @@ impl GenerateConfig for WindowConfig { } } +const fn default_events_before() -> usize { + 100 +} + +const fn default_events_after() -> usize { + 0 +} + #[async_trait::async_trait] #[typetag::serde(name = "window")] impl TransformConfig for WindowConfig { @@ -48,8 +68,8 @@ impl TransformConfig for WindowConfig { .map(|condition| condition.build(&context.enrichment_tables)) .transpose()?, self.flush_when.build(&context.enrichment_tables)?, - self.events_before.unwrap_or(100), - self.events_after.unwrap_or(0), + self.events_before, + self.events_after, ) .unwrap(), )) diff --git a/src/transforms/window/transform.rs b/src/transforms/window/transform.rs index c221097090523..22be246ab9407 100644 --- a/src/transforms/window/transform.rs +++ b/src/transforms/window/transform.rs @@ -361,8 +361,8 @@ mod test { WindowConfig { flush_when, pass_when, - events_before: Some(events_before), - events_after: Some(events_after), + events_before, + events_after, } } diff --git a/website/cue/reference/components/transforms/base/window.cue b/website/cue/reference/components/transforms/base/window.cue index 78f533d9aae6b..7e7fcecdb3b96 100644 --- a/website/cue/reference/components/transforms/base/window.cue +++ b/website/cue/reference/components/transforms/base/window.cue @@ -1,6 +1,16 @@ package metadata base: components: transforms: window: configuration: { + events_after: { + description: "The maximum number of events to keep after the event matched by the `flush_when` condition." + required: false + type: uint: default: 0 + } + events_before: { + description: "The maximum number of events to keep before the event matched by the `flush_when` condition." + required: false + type: uint: default: 100 + } flush_when: { description: """ A condition used to flush the events. @@ -22,22 +32,4 @@ base: components: transforms: window: configuration: { required: false type: condition: {} } - events_before: { - description: """ - The maximum number of events to keep before the event matched by the `flush_when` condition. - """ - required: false - type: uint: { - default: 100 - } - } - events_after: { - description: """ - The maximum number of events to keep after the event matched by the `flush_when` condition. - """ - required: false - type: uint: { - default: 0 - } - } } From 92ae4af46a2e2409b73fda1c74e3545a1ece5945 Mon Sep 17 00:00:00 2001 From: Linas Zvirblis Date: Thu, 13 Mar 2025 10:50:50 +0100 Subject: [PATCH 6/8] Update transform description --- .../reference/configuration/transforms/window.md | 2 +- .../reference/components/transforms/window.cue | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/website/content/en/docs/reference/configuration/transforms/window.md b/website/content/en/docs/reference/configuration/transforms/window.md index 7ab624bf0863e..ade190195844f 100644 --- a/website/content/en/docs/reference/configuration/transforms/window.md +++ b/website/content/en/docs/reference/configuration/transforms/window.md @@ -1,6 +1,6 @@ --- title: Window -description: When a condition is met, flush recent events to the output +description: A variant of ring buffer or backtrace logging implemented as a sliding window component_kind: transform layout: component tags: ["filter", "component", "transform", "logs"] diff --git a/website/cue/reference/components/transforms/window.cue b/website/cue/reference/components/transforms/window.cue index 58adcfe0caee3..6d64f421978da 100644 --- a/website/cue/reference/components/transforms/window.cue +++ b/website/cue/reference/components/transforms/window.cue @@ -4,7 +4,8 @@ components: transforms: window: { title: "Window" description: """ - When a condition is met, flush recent events to the output. Otherwise silently drop non-matching events. + A variant of ring buffer or backtrace logging implemented as a sliding window. Keeps events in a buffer until + the `flush_when` condition is matched. When the buffer is full, the oldest events are dropped. """ classes: { @@ -122,16 +123,16 @@ components: transforms: window: { {{< svg "img/sliding-window.svg" >}} - Past events are stored in a memory buffer with the capacity of `events_before`. The buffer is not persistent, - so in case of a hard system crash, all the buffered events will be lost. + Past events are stored in a memory buffer with the capacity of `events_before`. When the buffer is full, + the oldest events are dropped to make space for new ones. The buffer is not persistent, so in case of a hard + system crash, all the buffered events will be lost. Future events are counted from the event matched by the `flush_when` condition until `events_after` number - of events is reached. Otherwise the transform functions as a `filter` transform, silently dropping - non-matching events. + of events is reached. If the `flush_when` condition is matched before the buffer fills up, it will be flushed again. If the flush - condition is triggered often enough (for example, the system is constantly logging errors), the transform may always - be in the flushing state, meaning that no events will be filtered. Therefore it works best for conditions + condition is triggered often enough (for example, the system is constantly logging errors), the transform may + always be in the flushing state, meaning that no events will be filtered. Therefore it works best for conditions that are relatively uncommon. """ } From f5dcdf44fbaf4975f233545e36ace91242119aa8 Mon Sep 17 00:00:00 2001 From: Linas Zvirblis Date: Tue, 8 Apr 2025 14:32:46 +0200 Subject: [PATCH 7/8] Add a more detailed changelog --- changelog.d/window-transform.feature.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/changelog.d/window-transform.feature.md b/changelog.d/window-transform.feature.md index fd532d0138215..b3f366108c625 100644 --- a/changelog.d/window-transform.feature.md +++ b/changelog.d/window-transform.feature.md @@ -1,3 +1,5 @@ -Add new `window` transform. +Add a new `window` transform, a variant of ring buffer or backtrace logging implemented as a sliding window. +Allows for reduction of log volume by filtering out logs when the system is healthy, but preserving detailed +logs when they are most relevant. authors: ilinas From c59dea224e387cbd02d0eb8f9e3be6251dc887de Mon Sep 17 00:00:00 2001 From: Linas Zvirblis Date: Wed, 9 Apr 2025 10:10:13 +0200 Subject: [PATCH 8/8] Rename config variables, update docs --- src/transforms/window/config.rs | 20 ++--- src/transforms/window/transform.rs | 78 +++++++++---------- .../components/transforms/base/window.cue | 30 +++---- .../components/transforms/window.cue | 20 ++--- 4 files changed, 74 insertions(+), 74 deletions(-) diff --git a/src/transforms/window/config.rs b/src/transforms/window/config.rs index 9c13b40aa71ea..21a2a5d36cafc 100644 --- a/src/transforms/window/config.rs +++ b/src/transforms/window/config.rs @@ -23,24 +23,24 @@ use super::transform::Window; pub struct WindowConfig { /// A condition used to pass events through the transform without buffering. /// - /// If the condition resolves to `true` for an event, the event is immediately passed through - /// without preserving the original order of events. Use with caution if the sink cannot handle - /// out of order events. - pub pass_when: Option, + /// If the condition resolves to `true` for an event, the event is immediately forwarded without + /// buffering and without preserving the original order of events. Use with caution if the sink + /// cannot handle out of order events. + pub forward_when: Option, /// A condition used to flush the events. /// /// If the condition resolves to `true` for an event, the whole window is immediately flushed, - /// including the event itself, and any following events if `events_after` is more than zero. + /// including the event itself, and any following events if `num_events_after` is more than zero. pub flush_when: AnyCondition, /// The maximum number of events to keep before the event matched by the `flush_when` condition. #[serde(default = "default_events_before")] - pub events_before: usize, + pub num_events_before: usize, /// The maximum number of events to keep after the event matched by the `flush_when` condition. #[serde(default = "default_events_after")] - pub events_after: usize, + pub num_events_after: usize, } impl GenerateConfig for WindowConfig { @@ -63,13 +63,13 @@ impl TransformConfig for WindowConfig { async fn build(&self, context: &TransformContext) -> crate::Result { Ok(Transform::function( Window::new( - self.pass_when + self.forward_when .as_ref() .map(|condition| condition.build(&context.enrichment_tables)) .transpose()?, self.flush_when.build(&context.enrichment_tables)?, - self.events_before, - self.events_after, + self.num_events_before, + self.num_events_after, ) .unwrap(), )) diff --git a/src/transforms/window/transform.rs b/src/transforms/window/transform.rs index 22be246ab9407..2149fbad61d0b 100644 --- a/src/transforms/window/transform.rs +++ b/src/transforms/window/transform.rs @@ -12,10 +12,10 @@ use crate::{ #[derive(Clone)] pub struct Window { // Configuration parameters - pass_when: Option, + forward_when: Option, flush_when: Condition, - events_before: usize, - events_after: usize, + num_events_before: usize, + num_events_after: usize, // Internal variables buffer: VecDeque, @@ -26,18 +26,18 @@ pub struct Window { impl Window { pub fn new( - pass_when: Option, + forward_when: Option, flush_when: Condition, - events_before: usize, - events_after: usize, + num_events_before: usize, + num_events_after: usize, ) -> crate::Result { - let buffer = VecDeque::with_capacity(events_before); + let buffer = VecDeque::with_capacity(num_events_before); Ok(Window { - pass_when, + forward_when, flush_when, - events_before, - events_after, + num_events_before, + num_events_after, events_dropped: register!(WindowEventsDropped), buffer, events_counter: 0, @@ -48,7 +48,7 @@ impl Window { impl FunctionTransform for Window { fn transform(&mut self, output: &mut OutputBuffer, event: Event) { - let (pass, event) = match self.pass_when.as_ref() { + let (pass, event) = match self.forward_when.as_ref() { Some(condition) => { let (result, event) = condition.check(event); (result, event) @@ -58,14 +58,14 @@ impl FunctionTransform for Window { let (flush, event) = self.flush_when.check(event); - if self.buffer.capacity() < self.events_before { - self.buffer.reserve(self.events_before); + if self.buffer.capacity() < self.num_events_before { + self.buffer.reserve(self.num_events_before); } if pass { output.push(event); } else if flush { - if self.events_before > 0 { + if self.num_events_before > 0 { self.buffer.drain(..).for_each(|evt| output.push(evt)); } @@ -75,18 +75,18 @@ impl FunctionTransform for Window { } else if self.is_flushing { self.events_counter += 1; - if self.events_counter > self.events_after { + if self.events_counter > self.num_events_after { self.events_counter = 0; self.is_flushing = false; self.events_dropped.emit(Count(1)); } else { output.push(event); } - } else if self.buffer.len() >= self.events_before { + } else if self.buffer.len() >= self.num_events_before { self.buffer.pop_front(); self.buffer.push_back(event); self.events_dropped.emit(Count(1)); - } else if self.events_before > 0 { + } else if self.num_events_before > 0 { self.buffer.push_back(event); } else { self.events_dropped.emit(Count(1)); @@ -135,15 +135,15 @@ mod test { async fn test_pass() { assert_transform_compliance(async { let flush_when = get_condition("flush"); - let pass_when = get_condition("pass"); - let transform_config = get_transform_config(flush_when, Some(pass_when), 1, 0); + let forward_when = get_condition("forward"); + let transform_config = get_transform_config(flush_when, Some(forward_when), 1, 0); let (tx, rx) = mpsc::channel(1); let (topology, mut out) = create_topology(ReceiverStream::new(rx), transform_config).await; - send_event(&tx, "pass").await; - assert_event("pass", out.recv().await).await; + send_event(&tx, "forward").await; + assert_event("forward", out.recv().await).await; drop(tx); topology.stop().await; @@ -240,26 +240,26 @@ mod test { async fn test_flush_and_pass() { assert_transform_compliance(async { let flush_when = get_condition("flush"); - let pass_when = get_condition("pass"); - let transform_config = get_transform_config(flush_when, Some(pass_when), 50, 5); + let forward_when = get_condition("forward"); + let transform_config = get_transform_config(flush_when, Some(forward_when), 50, 5); let (tx, rx) = mpsc::channel(1); let (topology, mut out) = create_topology(ReceiverStream::new(rx), transform_config).await; send_events(&tx, generate_events(1..=5)).await; - send_event(&tx, "pass").await; + send_event(&tx, "forward").await; send_events(&tx, generate_events(6..=10)).await; - send_event(&tx, "pass").await; + send_event(&tx, "forward").await; send_event(&tx, "flush").await; - send_event(&tx, "pass").await; + send_event(&tx, "forward").await; send_events(&tx, generate_events(11..=15)).await; - send_event(&tx, "pass").await; + send_event(&tx, "forward").await; send_events(&tx, generate_events(16..=20)).await; let mut expected: [&str; 20] = [ - "pass", "pass", "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", "A09", - "A10", "flush", "pass", "A11", "A12", "A13", "A14", "A15", "pass", + "forward", "forward", "A01", "A02", "A03", "A04", "A05", "A06", "A07", "A08", + "A09", "A10", "flush", "forward", "A11", "A12", "A13", "A14", "A15", "forward", ]; assert_events(&mut expected, &mut out).await; @@ -326,8 +326,8 @@ mod test { async fn test_zero_pass() { assert_transform_compliance(async { let flush_when = get_condition("flush"); - let pass_when = get_condition("pass"); - let transform_config = get_transform_config(flush_when, Some(pass_when), 0, 0); + let forward_when = get_condition("forward"); + let transform_config = get_transform_config(flush_when, Some(forward_when), 0, 0); let (tx, rx) = mpsc::channel(1); let (topology, mut out) = @@ -337,11 +337,11 @@ mod test { let more_events = generate_events(51..=70); send_events(&tx, events).await; - send_event(&tx, "pass").await; + send_event(&tx, "forward").await; send_event(&tx, "flush").await; send_events(&tx, more_events).await; - let mut expected: [&str; 2] = ["pass", "flush"]; + let mut expected: [&str; 2] = ["forward", "flush"]; assert_events(&mut expected, &mut out).await; drop(tx); @@ -354,15 +354,15 @@ mod test { const fn get_transform_config( flush_when: AnyCondition, - pass_when: Option, - events_before: usize, - events_after: usize, + forward_when: Option, + num_events_before: usize, + num_events_after: usize, ) -> WindowConfig { WindowConfig { flush_when, - pass_when, - events_before, - events_after, + forward_when, + num_events_before, + num_events_after, } } diff --git a/website/cue/reference/components/transforms/base/window.cue b/website/cue/reference/components/transforms/base/window.cue index 7e7fcecdb3b96..6eeb587c9ece0 100644 --- a/website/cue/reference/components/transforms/base/window.cue +++ b/website/cue/reference/components/transforms/base/window.cue @@ -1,35 +1,35 @@ package metadata base: components: transforms: window: configuration: { - events_after: { - description: "The maximum number of events to keep after the event matched by the `flush_when` condition." - required: false - type: uint: default: 0 - } - events_before: { - description: "The maximum number of events to keep before the event matched by the `flush_when` condition." - required: false - type: uint: default: 100 - } flush_when: { description: """ A condition used to flush the events. If the condition resolves to `true` for an event, the whole window is immediately flushed, - including the event itself, and any following events if `events_after` is more than zero. + including the event itself, and any following events if `num_events_after` is more than zero. """ required: true type: condition: {} } - pass_when: { + forward_when: { description: """ A condition used to pass events through the transform without buffering. - If the condition resolves to `true` for an event, the event is immediately passed through - without preserving the original order of events. Use with caution if the sink cannot handle - out of order events. + If the condition resolves to `true` for an event, the event is immediately forwarded without + buffering and without preserving the original order of events. Use with caution if the sink + cannot handle out of order events. """ required: false type: condition: {} } + num_events_after: { + description: "The maximum number of events to keep after the event matched by the `flush_when` condition." + required: false + type: uint: default: 0 + } + num_events_before: { + description: "The maximum number of events to keep before the event matched by the `flush_when` condition." + required: false + type: uint: default: 100 + } } diff --git a/website/cue/reference/components/transforms/window.cue b/website/cue/reference/components/transforms/window.cue index 6d64f421978da..4198d7b1afde5 100644 --- a/website/cue/reference/components/transforms/window.cue +++ b/website/cue/reference/components/transforms/window.cue @@ -50,9 +50,9 @@ components: transforms: window: { ] configuration: { - flush_when: #".level == "error""# - events_before: 2 - events_after: 2 + flush_when: #".level == "error""# + num_events_before: 2 + num_events_after: 2 } output: [ @@ -80,10 +80,10 @@ components: transforms: window: { ] configuration: { - flush_when: #".level == "error""# - pass_when: #".level == "info""# - events_before: 2 - events_after: 2 + flush_when: #".level == "error""# + forward_when: #".level == "info""# + num_events_before: 2 + num_events_after: 2 } output: [ @@ -117,17 +117,17 @@ components: transforms: window: { title: "Sliding Window" body: """ As the stream of events passes through the transform, it is observed though a "window" that spans between - `events_before` and `events_after` relative to an event matched by the `flush_when` condition. When the + `num_events_before` and `num_events_after` relative to an event matched by the `flush_when` condition. When the condition is matched, the whole window is flushed to the output. This is also known as backtrace logging or ring buffer logging. {{< svg "img/sliding-window.svg" >}} - Past events are stored in a memory buffer with the capacity of `events_before`. When the buffer is full, + Past events are stored in a memory buffer with the capacity of `num_events_before`. When the buffer is full, the oldest events are dropped to make space for new ones. The buffer is not persistent, so in case of a hard system crash, all the buffered events will be lost. - Future events are counted from the event matched by the `flush_when` condition until `events_after` number + Future events are counted from the event matched by the `flush_when` condition until `num_events_after` number of events is reached. If the `flush_when` condition is matched before the buffer fills up, it will be flushed again. If the flush