Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ transforms-logs = [
"transforms-aws_ec2_metadata",
"transforms-dedupe",
"transforms-filter",
"transforms-window",
"transforms-log_to_metric",
"transforms-lua",
"transforms-metric_to_log",
Expand All @@ -696,6 +697,7 @@ transforms-aggregate = []
transforms-aws_ec2_metadata = ["dep:arc-swap"]
transforms-dedupe = ["transforms-impl-dedupe"]
transforms-filter = []
transforms-window = []
transforms-log_to_metric = []
transforms-lua = ["dep:mlua", "vector-lib/lua"]
transforms-metric_to_log = []
Expand Down
5 changes: 5 additions & 0 deletions changelog.d/window-transform.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Add a new `window` transform, a variant of ring buffer or backtrace logging implemented as a sliding window.
Allows for reduction of log volume by filtering out logs when the system is healthy, but preserving detailed
logs when they are most relevant.

authors: ilinas
4 changes: 4 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ mod unix;
mod websocket;
#[cfg(feature = "sinks-websocket-server")]
mod websocket_server;
#[cfg(feature = "transforms-window")]
mod window;

#[cfg(any(
feature = "sources-file",
Expand Down Expand Up @@ -265,6 +267,8 @@ pub(crate) use self::unix::*;
pub(crate) use self::websocket::*;
#[cfg(feature = "sinks-websocket-server")]
pub(crate) use self::websocket_server::*;
#[cfg(feature = "transforms-window")]
pub(crate) use self::window::*;
#[cfg(windows)]
pub(crate) use self::windows::*;
pub use self::{
Expand Down
14 changes: 14 additions & 0 deletions src/internal_events/window.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use vector_lib::internal_event::{ComponentEventsDropped, Count, Registered, INTENTIONAL};

vector_lib::registered_event!(
WindowEventsDropped => {
events_dropped: Registered<ComponentEventsDropped<'static, INTENTIONAL>>
= register!(ComponentEventsDropped::<INTENTIONAL>::from(
"The buffer was full"
)),
}

fn emit(&self, data: Count) {
self.events_dropped.emit(data);
}
);
2 changes: 2 additions & 0 deletions src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub mod route;
pub mod tag_cardinality_limit;
#[cfg(feature = "transforms-throttle")]
pub mod throttle;
#[cfg(feature = "transforms-window")]
pub mod window;

pub use vector_lib::transform::{
FunctionTransform, OutputBuffer, SyncTransform, TaskTransform, Transform, TransformOutputs,
Expand Down
94 changes: 94 additions & 0 deletions src/transforms/window/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use vector_lib::config::{clone_input_definitions, LogNamespace};
use vector_lib::configurable::configurable_component;

use crate::{
conditions::AnyCondition,
config::{
DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
TransformOutput,
},
schema,
transforms::Transform,
};

use super::transform::Window;

/// Configuration for the `window` transform.
#[configurable_component(transform(
"window",
"Apply a buffered sliding window over the stream of events and flush it based on supplied criteria"
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct WindowConfig {
/// A condition used to pass events through the transform without buffering.
///
/// If the condition resolves to `true` for an event, the event is immediately forwarded without
/// buffering and without preserving the original order of events. Use with caution if the sink
/// cannot handle out of order events.
pub forward_when: Option<AnyCondition>,

/// A condition used to flush the events.
///
/// If the condition resolves to `true` for an event, the whole window is immediately flushed,
/// including the event itself, and any following events if `num_events_after` is more than zero.
pub flush_when: AnyCondition,

/// The maximum number of events to keep before the event matched by the `flush_when` condition.
#[serde(default = "default_events_before")]
pub num_events_before: usize,

/// The maximum number of events to keep after the event matched by the `flush_when` condition.
#[serde(default = "default_events_after")]
pub num_events_after: usize,
}

impl GenerateConfig for WindowConfig {
fn generate_config() -> toml::Value {
toml::from_str(r#"flush_when = ".message == \"value\"""#).unwrap()
}
}

const fn default_events_before() -> usize {
100
}

const fn default_events_after() -> usize {
0
}

#[async_trait::async_trait]
#[typetag::serde(name = "window")]
impl TransformConfig for WindowConfig {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
Ok(Transform::function(
Window::new(
self.forward_when
.as_ref()
.map(|condition| condition.build(&context.enrichment_tables))
.transpose()?,
self.flush_when.build(&context.enrichment_tables)?,
self.num_events_before,
self.num_events_after,
)
.unwrap(),
))
}

fn input(&self) -> Input {
Input::new(DataType::Log)
}

fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
// The event is not modified, so the definition is passed through as-is
vec![TransformOutput::new(
DataType::Log,
clone_input_definitions(input_definitions),
)]
}
}
2 changes: 2 additions & 0 deletions src/transforms/window/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod config;
pub mod transform;
Loading
Loading