diff --git a/build.rs b/build.rs index 31c151f4b61d7..4e8c4f4986304 100644 --- a/build.rs +++ b/build.rs @@ -1,11 +1,4 @@ -use std::{ - collections::HashSet, - env, - fs::File, - io::Write, - path::{Path, PathBuf}, - process::Command, -}; +use std::{collections::HashSet, env, fs::File, io::Write, path::Path, process::Command}; struct TrackedEnv { tracked: HashSet, @@ -137,7 +130,7 @@ fn main() { // in a type-safe way, which is necessary for incrementally building certain payloads, like // the ones generated in the `datadog_metrics` sink. let protobuf_fds_path = - PathBuf::from(std::env::var("OUT_DIR").expect("OUT_DIR environment variable not set")) + Path::new(&std::env::var("OUT_DIR").expect("OUT_DIR environment variable not set")) .join("protobuf-fds.bin"); let mut prost_build = prost_build::Config::new(); diff --git a/src/transforms/lua/mod.rs b/src/transforms/lua/mod.rs index 24052542ac8b5..ee9767f9f20b2 100644 --- a/src/transforms/lua/mod.rs +++ b/src/transforms/lua/mod.rs @@ -2,7 +2,7 @@ pub mod v1; pub mod v2; use vector_config::configurable_component; -use vector_core::config::LogNamespace; +use vector_core::config::{ComponentKey, LogNamespace}; use crate::{ config::{GenerateConfig, Input, OutputId, TransformConfig, TransformContext, TransformOutput}, @@ -89,10 +89,14 @@ impl GenerateConfig for LuaConfig { #[async_trait::async_trait] #[typetag::serde(name = "lua")] impl TransformConfig for LuaConfig { - async fn build(&self, _context: &TransformContext) -> crate::Result { + async fn build(&self, context: &TransformContext) -> crate::Result { + let key = context + .key + .as_ref() + .map_or_else(|| ComponentKey::from("lua"), Clone::clone); match self { LuaConfig::V1(v1) => v1.config.build(), - LuaConfig::V2(v2) => v2.config.build(), + LuaConfig::V2(v2) => v2.config.build(key), } } diff --git a/src/transforms/lua/v1/mod.rs b/src/transforms/lua/v1/mod.rs index 4daae33000530..f5b143214141b 100644 --- a/src/transforms/lua/v1/mod.rs +++ b/src/transforms/lua/v1/mod.rs @@ -146,6 +146,7 @@ impl Lua { } fn process(&mut self, event: Event) -> Result, mlua::Error> { + let source_id = event.source_id().cloned(); let lua = &self.lua; let globals = lua.globals(); @@ -156,7 +157,15 @@ impl Lua { let result = globals .raw_get::<_, Option>("event") - .map(|option| option.map(|lua_event| lua_event.inner)); + .map(|option| { + option.map(|lua_event| { + let mut event = lua_event.inner; + if let Some(source_id) = source_id { + event.set_source_id(source_id); + } + event + }) + }); self.invocations_after_gc += 1; if self.invocations_after_gc % GC_INTERVAL == 0 { @@ -302,167 +311,130 @@ pub fn format_error(error: &mlua::Error) -> String { #[cfg(test)] mod tests { + use std::sync::Arc; + use super::*; use crate::event::{Event, LogEvent, Value}; + use crate::{config::ComponentKey, test_util}; #[test] fn lua_add_field() { - crate::test_util::trace_init(); - let mut transform = Lua::new( + let event = transform_one( r#" event["hello"] = "goodbye" - "# - .to_string(), - vec![], + "#, + LogEvent::from("program me"), ) .unwrap(); - let event = Event::Log(LogEvent::from("program me")); - - let event = transform.transform_one(event).unwrap(); - assert_eq!(event.as_log()["hello"], "goodbye".into()); } #[test] fn lua_read_field() { - crate::test_util::trace_init(); - let mut transform = Lua::new( + let event = transform_one( r#" _, _, name = string.find(event["message"], "Hello, my name is (%a+).") event["name"] = name - "# - .to_string(), - vec![], + "#, + LogEvent::from("Hello, my name is Bob."), ) .unwrap(); - let event = Event::Log(LogEvent::from("Hello, my name is Bob.")); - - let event = transform.transform_one(event).unwrap(); - assert_eq!(event.as_log()["name"], "Bob".into()); } #[test] fn lua_remove_field() { - crate::test_util::trace_init(); - let mut transform = Lua::new( + let mut log = LogEvent::default(); + log.insert("name", "Bob"); + let event = transform_one( r#" event["name"] = nil - "# - .to_string(), - vec![], + "#, + log, ) .unwrap(); - let mut log = LogEvent::default(); - log.insert("name", "Bob"); - let event = transform.transform_one(log.into()).unwrap(); - assert!(event.as_log().get("name").is_none()); } #[test] fn lua_drop_event() { - let mut transform = Lua::new( - r#" - event = nil - "# - .to_string(), - vec![], - ) - .unwrap(); - let mut log = LogEvent::default(); log.insert("name", "Bob"); - let event = transform.transform_one(log.into()); + let event = transform_one( + r#" + event = nil + "#, + log, + ); assert!(event.is_none()); } #[test] fn lua_read_empty_field() { - crate::test_util::trace_init(); - let mut transform = Lua::new( + let event = transform_one( r#" if event["non-existant"] == nil then event["result"] = "empty" else event["result"] = "found" end - "# - .to_string(), - vec![], + "#, + LogEvent::default(), ) .unwrap(); - let event = transform.transform_one(LogEvent::default().into()).unwrap(); - assert_eq!(event.as_log()["result"], "empty".into()); } #[test] fn lua_integer_value() { - crate::test_util::trace_init(); - let mut transform = Lua::new( + let event = transform_one( r#" event["number"] = 3 - "# - .to_string(), - vec![], + "#, + LogEvent::default(), ) .unwrap(); - - let event = transform.transform_one(LogEvent::default().into()).unwrap(); assert_eq!(event.as_log()["number"], Value::Integer(3)); } #[test] fn lua_numeric_value() { - crate::test_util::trace_init(); - let mut transform = Lua::new( + let event = transform_one( r#" event["number"] = 3.14159 - "# - .to_string(), - vec![], + "#, + LogEvent::default(), ) .unwrap(); - - let event = transform.transform_one(LogEvent::default().into()).unwrap(); assert_eq!(event.as_log()["number"], Value::from(3.14159)); } #[test] fn lua_boolean_value() { - crate::test_util::trace_init(); - let mut transform = Lua::new( + let event = transform_one( r#" event["bool"] = true - "# - .to_string(), - vec![], + "#, + LogEvent::default(), ) .unwrap(); - - let event = transform.transform_one(LogEvent::default().into()).unwrap(); assert_eq!(event.as_log()["bool"], Value::Boolean(true)); } #[test] fn lua_non_coercible_value() { - crate::test_util::trace_init(); - let mut transform = Lua::new( + let event = transform_one( r#" event["junk"] = {"asdf"} - "# - .to_string(), - vec![], + "#, + LogEvent::default(), ) .unwrap(); - - let event = transform.transform_one(LogEvent::default().into()).unwrap(); assert_eq!(event.as_log().get("junk"), None); } @@ -573,32 +545,48 @@ mod tests { let mut transform = Lua::new(source, vec![dir.path().to_string_lossy().into_owned()]).unwrap(); - let event = LogEvent::default().into(); - let event = transform.transform_one(event).unwrap(); + let event = transform.transform_one(LogEvent::default().into()).unwrap(); assert_eq!(event.as_log()["\"new field\""], "new value".into()); } #[test] fn lua_pairs() { - crate::test_util::trace_init(); - let mut transform = Lua::new( + let mut event = LogEvent::default(); + event.insert("name", "Bob"); + event.insert("friend", "Alice"); + + let event = transform_one( r#" for k,v in pairs(event) do event[k] = k .. v end - "# - .to_string(), - vec![], + "#, + event, ) .unwrap(); - let mut event = LogEvent::default(); - event.insert("name", "Bob"); - event.insert("friend", "Alice"); - - let event = transform.transform_one(event.into()).unwrap(); - assert_eq!(event.as_log()["name"], "nameBob".into()); assert_eq!(event.as_log()["friend"], "friendAlice".into()); } + + fn transform_one(transform: &str, event: impl Into) -> Option { + crate::test_util::trace_init(); + + let source = source_id(); + let mut event = event.into(); + event.set_source_id(Arc::clone(&source)); + + let mut transform = Lua::new(transform.to_string(), vec![]).unwrap(); + let event = transform.transform_one(event); + + if let Some(event) = &event { + assert_eq!(event.source_id(), Some(&source)); + } + + event + } + + fn source_id() -> Arc { + Arc::new(ComponentKey::from(test_util::random_string(16))) + } } diff --git a/src/transforms/lua/v2/mod.rs b/src/transforms/lua/v2/mod.rs index a6a65cde824ef..787e69731f301 100644 --- a/src/transforms/lua/v2/mod.rs +++ b/src/transforms/lua/v2/mod.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, time::Duration}; +use std::{path::PathBuf, sync::Arc, time::Duration}; use codecs::MetricTagValues; use serde_with::serde_as; @@ -7,7 +7,7 @@ use vector_config::configurable_component; pub use vector_core::event::lua; use vector_core::transform::runtime_transform::{RuntimeTransform, Timer}; -use crate::config::OutputId; +use crate::config::{ComponentKey, OutputId}; use crate::event::lua::event::LuaEvent; use crate::schema::Definition; use crate::{ @@ -172,8 +172,8 @@ struct TimerConfig { } impl LuaConfig { - pub fn build(&self) -> crate::Result { - Lua::new(self).map(Transform::event_task) + pub fn build(&self, key: ComponentKey) -> crate::Result { + Lua::new(self, key).map(Transform::event_task) } pub fn input(&self) -> Input { @@ -223,6 +223,7 @@ pub struct Lua { hook_shutdown: Option, timers: Vec<(Timer, mlua::RegistryKey)>, multi_value_tags: bool, + source_id: Arc, } // Helper to create `RegistryKey` from Lua function code @@ -233,7 +234,7 @@ fn make_registry_value(lua: &mlua::Lua, source: &str) -> mlua::Result crate::Result { + pub fn new(config: &LuaConfig, key: ComponentKey) -> crate::Result { // In order to support loading C modules in Lua, we need to create unsafe instance // without debug library. let lua = unsafe { @@ -301,14 +302,19 @@ impl Lua { hook_process, hook_shutdown, multi_value_tags, + source_id: Arc::new(key), }) } #[cfg(test)] fn process(&mut self, event: Event, output: &mut Vec) -> Result<(), mlua::Error> { + let source_id = event.source_id().cloned(); let lua = &self.lua; let result = lua.scope(|scope| { - let emit = scope.create_function_mut(|_, event: Event| { + let emit = scope.create_function_mut(|_, mut event: Event| { + if let Some(source_id) = &source_id { + event.set_source_id(Arc::clone(source_id)); + } output.push(event); Ok(()) })?; @@ -355,11 +361,13 @@ impl Lua { fn wrap_emit_fn<'lua, 'scope, F: 'scope>( scope: &mlua::Scope<'lua, 'scope>, mut emit_fn: F, + source_id: Arc, ) -> mlua::Result> where F: FnMut(Event), { - scope.create_function_mut(move |_, event: Event| -> mlua::Result<()> { + scope.create_function_mut(move |_, mut event: Event| -> mlua::Result<()> { + event.set_source_id(Arc::clone(&source_id)); emit_fn(event); Ok(()) }) @@ -371,6 +379,7 @@ impl RuntimeTransform for Lua { F: FnMut(Event), { let lua = &self.lua; + let source_id = Arc::clone(event.source_id().unwrap_or(&self.source_id)); _ = lua .scope(|scope| -> mlua::Result<()> { lua.registry_value::(&self.hook_process)? @@ -379,7 +388,7 @@ impl RuntimeTransform for Lua { event, metric_multi_value_tags: self.multi_value_tags, }, - wrap_emit_fn(scope, emit_fn)?, + wrap_emit_fn(scope, emit_fn, source_id)?, )) }) .context(RuntimeErrorHooksProcessSnafu) @@ -398,7 +407,7 @@ impl RuntimeTransform for Lua { match &self.hook_init { Some(key) => lua .registry_value::(key)? - .call(wrap_emit_fn(scope, emit_fn)?), + .call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?), None => Ok(()), } }) @@ -418,7 +427,7 @@ impl RuntimeTransform for Lua { match &self.hook_shutdown { Some(key) => lua .registry_value::(key)? - .call(wrap_emit_fn(scope, emit_fn)?), + .call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?), None => Ok(()), } }) @@ -437,7 +446,7 @@ impl RuntimeTransform for Lua { .scope(|scope| -> mlua::Result<()> { let handler_key = &self.timers[timer.id as usize].1; lua.registry_value::(handler_key)? - .call(wrap_emit_fn(scope, emit_fn)?) + .call(wrap_emit_fn(scope, emit_fn, Arc::clone(&self.source_id))?) }) .context(RuntimeErrorTimerHandlerSnafu) .map_err(|error| error!(%error, rate_limit = 30)); @@ -450,41 +459,44 @@ impl RuntimeTransform for Lua { } } -#[cfg(test)] -fn format_error(error: &mlua::Error) -> String { - match error { - mlua::Error::CallbackError { traceback, cause } => format_error(cause) + "\n" + traceback, - err => err.to_string(), - } -} - #[cfg(test)] mod tests { - use std::future::Future; - use std::sync::Arc; - use tokio::sync::mpsc; - use tokio::sync::mpsc::{Receiver, Sender}; + use std::{future::Future, sync::Arc}; + + use similar_asserts::assert_eq; + use tokio::sync::mpsc::{self, Receiver, Sender}; + use tokio::sync::Mutex; use tokio_stream::wrappers::ReceiverStream; use super::*; - use crate::test_util::components::assert_transform_compliance; + use crate::test_util::{components::assert_transform_compliance, random_string}; use crate::transforms::test::create_topology; use crate::{ event::{ metric::{Metric, MetricKind, MetricValue}, Event, LogEvent, Value, }, - test_util::trace_init, + test_util, }; + fn format_error(error: &mlua::Error) -> String { + match error { + mlua::Error::CallbackError { traceback, cause } => { + format_error(cause) + "\n" + traceback + } + err => err.to_string(), + } + } + fn from_config(config: &str) -> crate::Result> { - Lua::new(&toml::from_str(config).unwrap()).map(Box::new) + Lua::new(&toml::from_str(config).unwrap(), "transform".into()).map(Box::new) } async fn run_transform( config: &str, - func: impl FnOnce(Sender, Arc>>) -> T, + func: impl FnOnce(Sender, Arc>>) -> T, ) -> T::Output { + test_util::trace_init(); assert_transform_compliance(async move { let config = super::super::LuaConfig::V2(toml::from_str(config).unwrap()); let (tx, rx) = mpsc::channel(1); @@ -492,9 +504,8 @@ mod tests { let out = Arc::new(tokio::sync::Mutex::new(out)); - let result = func(tx.clone(), Arc::clone(&out)).await; + let result = func(tx, Arc::clone(&out)).await; - drop(tx); topology.stop().await; assert_eq!(out.lock().await.recv().await, None); @@ -503,10 +514,59 @@ mod tests { .await } + async fn next_event(out: &Arc>>, source: &str) -> Event { + let event = out + .lock() + .await + .recv() + .await + .expect("Event was not received"); + assert_eq!( + event.source_id(), + Some(&Arc::new(ComponentKey::from(source))) + ); + event + } + #[tokio::test] - async fn lua_add_field() { - trace_init(); + async fn lua_runs_init_hook() { + let line1 = random_string(9); + run_transform( + &format!( + r#" + version = "2" + hooks.init = """function (emit) + event = {{log={{message="{line1}"}}}} + emit(event) + end + """ + hooks.process = """function (event, emit) + emit(event) + end + """ + "# + ), + |tx, out| async move { + let line2 = random_string(9); + tx.send(Event::Log(LogEvent::from(line2.as_str()))) + .await + .unwrap(); + drop(tx); + assert_eq!( + next_event(&out, "transform").await.as_log()["message"], + line1.into() + ); + assert_eq!( + next_event(&out, "in").await.as_log()["message"], + line2.into(), + ); + }, + ) + .await; + } + #[tokio::test] + async fn lua_add_field() { run_transform( r#" version = "2" @@ -521,7 +581,7 @@ mod tests { tx.send(event).await.unwrap(); assert_eq!( - out.lock().await.recv().await.unwrap().as_log()["hello"], + next_event(&out, "in").await.as_log()["hello"], "goodbye".into() ); }, @@ -531,8 +591,6 @@ mod tests { #[tokio::test] async fn lua_read_field() { - trace_init(); - run_transform( r#" version = "2" @@ -547,10 +605,7 @@ mod tests { let event = Event::Log(LogEvent::from("Hello, my name is Bob.")); tx.send(event).await.unwrap(); - assert_eq!( - out.lock().await.recv().await.unwrap().as_log()["name"], - "Bob".into() - ); + assert_eq!(next_event(&out, "in").await.as_log()["name"], "Bob".into()); }, ) .await; @@ -558,8 +613,6 @@ mod tests { #[tokio::test] async fn lua_remove_field() { - trace_init(); - run_transform( r#" version = "2" @@ -575,10 +628,7 @@ mod tests { tx.send(event.into()).await.unwrap(); - assert_eq!( - out.lock().await.recv().await.unwrap().as_log().get("name"), - None - ); + assert_eq!(next_event(&out, "in").await.as_log().get("name"), None); }, ) .await; @@ -586,8 +636,6 @@ mod tests { #[tokio::test] async fn lua_drop_event() { - trace_init(); - run_transform( r#" version = "2" @@ -608,8 +656,6 @@ mod tests { #[tokio::test] async fn lua_duplicate_event() { - trace_init(); - run_transform( r#" version = "2" @@ -633,8 +679,6 @@ mod tests { #[tokio::test] async fn lua_read_empty_field() { - trace_init(); - run_transform( r#" version = "2" @@ -653,7 +697,7 @@ mod tests { tx.send(event.into()).await.unwrap(); assert_eq!( - out.lock().await.recv().await.unwrap().as_log()["result"], + next_event(&out, "in").await.as_log()["result"], "empty".into() ); }, @@ -663,7 +707,6 @@ mod tests { #[tokio::test] async fn lua_integer_value() { - trace_init(); run_transform( r#" version = "2" @@ -678,7 +721,7 @@ mod tests { tx.send(event.into()).await.unwrap(); assert_eq!( - out.lock().await.recv().await.unwrap().as_log()["number"], + next_event(&out, "in").await.as_log()["number"], Value::Integer(3) ); }, @@ -688,8 +731,6 @@ mod tests { #[tokio::test] async fn lua_numeric_value() { - trace_init(); - run_transform( r#" version = "2" @@ -704,7 +745,7 @@ mod tests { tx.send(event.into()).await.unwrap(); assert_eq!( - out.lock().await.recv().await.unwrap().as_log()["number"], + next_event(&out, "in").await.as_log()["number"], Value::from(3.14159) ); }, @@ -714,8 +755,6 @@ mod tests { #[tokio::test] async fn lua_boolean_value() { - trace_init(); - run_transform( r#" version = "2" @@ -730,7 +769,7 @@ mod tests { tx.send(event.into()).await.unwrap(); assert_eq!( - out.lock().await.recv().await.unwrap().as_log()["bool"], + next_event(&out, "in").await.as_log()["bool"], Value::Boolean(true) ); }, @@ -740,7 +779,6 @@ mod tests { #[tokio::test] async fn lua_non_coercible_value() { - trace_init(); run_transform( r#" version = "2" @@ -754,10 +792,7 @@ mod tests { let event = LogEvent::default(); tx.send(event.into()).await.unwrap(); - assert_eq!( - out.lock().await.recv().await.unwrap().as_log().get("junk"), - None - ); + assert_eq!(next_event(&out, "in").await.as_log().get("junk"), None); }, ) .await; @@ -765,8 +800,6 @@ mod tests { #[tokio::test] async fn lua_non_string_key_write() -> crate::Result<()> { - trace_init(); - let mut transform = from_config( r#" hooks.process = """function (event, emit) @@ -792,8 +825,6 @@ mod tests { #[tokio::test] async fn lua_non_string_key_read() { - trace_init(); - run_transform( r#" version = "2" @@ -807,16 +838,7 @@ mod tests { let event = LogEvent::default(); tx.send(event.into()).await.unwrap(); - assert_eq!( - out.lock() - .await - .recv() - .await - .unwrap() - .as_log() - .get("result"), - None - ); + assert_eq!(next_event(&out, "in").await.as_log().get("result"), None); }, ) .await; @@ -824,8 +846,6 @@ mod tests { #[tokio::test] async fn lua_script_error() -> crate::Result<()> { - trace_init(); - let mut transform = from_config( r#" hooks.process = """function (event, emit) @@ -846,8 +866,6 @@ mod tests { #[tokio::test] async fn lua_syntax_error() -> crate::Result<()> { - trace_init(); - let err = from_config( r#" hooks.process = """function (event, emit) @@ -867,7 +885,6 @@ mod tests { #[tokio::test] async fn lua_load_file() { use std::{fs::File, io::Write}; - trace_init(); let dir = tempfile::tempdir().unwrap(); let mut file = File::create(dir.path().join("script2.lua")).unwrap(); @@ -905,7 +922,7 @@ mod tests { tx.send(event.into()).await.unwrap(); assert_eq!( - out.lock().await.recv().await.unwrap().as_log()["\"new field\""], + next_event(&out, "in").await.as_log()["\"new field\""], "new value".into() ); }, @@ -915,7 +932,6 @@ mod tests { #[tokio::test] async fn lua_pairs() { - trace_init(); run_transform( r#" version = "2" @@ -933,7 +949,7 @@ mod tests { event.insert("friend", "Alice"); tx.send(event.into()).await.unwrap(); - let output = out.lock().await.recv().await.unwrap(); + let output = next_event(&out, "in").await; assert_eq!(output.as_log()["name"], "nameBob".into()); assert_eq!(output.as_log()["friend"], "friendAlice".into()); @@ -944,7 +960,6 @@ mod tests { #[tokio::test] async fn lua_metric() { - trace_init(); run_transform( r#" version = "2" @@ -964,16 +979,13 @@ mod tests { let mut expected = metric .clone() .with_value(MetricValue::Counter { value: 2.0 }); - expected - .metadata_mut() - .set_upstream_id(Arc::new(OutputId::from("transform"))); + let metadata = expected.metadata_mut(); + metadata.set_upstream_id(Arc::new(OutputId::from("transform"))); + metadata.set_source_id(Arc::new(ComponentKey::from("in"))); tx.send(metric.into()).await.unwrap(); - assert_eq!( - out.lock().await.recv().await.unwrap().as_metric(), - &expected - ); + assert_eq!(next_event(&out, "in").await.as_metric(), &expected); }, ) .await; @@ -981,7 +993,6 @@ mod tests { #[tokio::test] async fn lua_multiple_events() { - trace_init(); run_transform( r#" version = "2" diff --git a/src/transforms/mod.rs b/src/transforms/mod.rs index bc0ef66b8936b..b78dda290b9aa 100644 --- a/src/transforms/mod.rs +++ b/src/transforms/mod.rs @@ -90,6 +90,7 @@ mod test { let (tx, rx) = mpsc::channel(1); + // TODO: Use non-hard-coded names to improve tests. builder.add_source("in", UnitTestStreamSourceConfig::new(events)); builder.add_transform("transform", &["in"], transform_config); builder.add_sink(