diff --git a/lib/vector-core/src/config/log_schema.rs b/lib/vector-core/src/config/log_schema.rs index 88bec5a328674..7520702b17776 100644 --- a/lib/vector-core/src/config/log_schema.rs +++ b/lib/vector-core/src/config/log_schema.rs @@ -73,7 +73,7 @@ pub struct LogSchema { /// Generally, this field will be set by Vector to hold event-specific metadata, such as /// annotations by the `remap` transform when an error or abort is encountered. #[serde(default = "LogSchema::default_metadata_key")] - metadata_key: String, + metadata_key: OptionalValuePath, } impl Default for LogSchema { @@ -105,8 +105,8 @@ impl LogSchema { OptionalValuePath::new(SOURCE_TYPE) } - fn default_metadata_key() -> String { - String::from(METADATA) + fn default_metadata_key() -> OptionalValuePath { + OptionalValuePath::new(METADATA) } pub fn message_key(&self) -> Option<&OwnedValuePath> { @@ -134,8 +134,8 @@ impl LogSchema { self.source_type_key.path.as_ref() } - pub fn metadata_key(&self) -> &str { - &self.metadata_key + pub fn metadata_key(&self) -> Option<&OwnedValuePath> { + self.metadata_key.path.as_ref() } pub fn set_message_key(&mut self, path: Option) { @@ -154,8 +154,8 @@ impl LogSchema { self.source_type_key = OptionalValuePath { path }; } - pub fn set_metadata_key(&mut self, v: String) { - self.metadata_key = v; + pub fn set_metadata_key(&mut self, path: Option) { + self.metadata_key = OptionalValuePath { path }; } /// Merge two `LogSchema` instances together. @@ -202,7 +202,7 @@ impl LogSchema { { errors.push("conflicting values for 'log_schema.metadata_key' found".to_owned()); } else { - self.set_metadata_key(other.metadata_key().to_string()); + self.set_metadata_key(other.metadata_key().cloned()); } } diff --git a/lib/vector-core/src/event/trace.rs b/lib/vector-core/src/event/trace.rs index 3760b7ad286af..21da7413dbc4a 100644 --- a/lib/vector-core/src/event/trace.rs +++ b/lib/vector-core/src/event/trace.rs @@ -7,6 +7,7 @@ use vector_common::{ internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags, EventDataEq, }; +use vrl::path::{PathPrefix, ValuePath}; use super::{ BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata, @@ -84,6 +85,8 @@ impl TraceEvent { self.0.contains(key.as_ref()) } + // TODO This should eventually use TargetPath for the `key` parameter. + // https://github.com/vectordotdev/vector/issues/18059 pub fn insert( &mut self, key: impl AsRef, @@ -91,6 +94,20 @@ impl TraceEvent { ) -> Option { self.0.insert(key.as_ref(), value.into()) } + + // TODO Audit code and use this if possible. + // https://github.com/vectordotdev/vector/issues/18059 + pub fn maybe_insert<'a, F: FnOnce() -> Value>( + &mut self, + prefix: PathPrefix, + path: Option>, + value_callback: F, + ) -> Option { + if let Some(path) = path { + return self.0.insert((prefix, path), value_callback()); + } + None + } } impl From for TraceEvent { diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index b47414f3b526b..d7060c40562cf 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -7,8 +7,7 @@ use std::{ }; use codecs::MetricTagValues; -use lookup::lookup_v2::{parse_value_path, ValuePath}; -use lookup::{metadata_path, owned_value_path, path, PathPrefix}; +use lookup::{metadata_path, owned_value_path, PathPrefix}; use snafu::{ResultExt, Snafu}; use vector_common::TimeZone; use vector_config::configurable_component; @@ -20,6 +19,8 @@ use vrl::compiler::runtime::{Runtime, Terminate}; use vrl::compiler::state::ExternalEnv; use vrl::compiler::{CompileConfig, ExpressionError, Function, Program, TypeState, VrlRuntime}; use vrl::diagnostic::{DiagnosticMessage, Formatter, Note}; +use vrl::path; +use vrl::path::ValuePath; use vrl::value::{Kind, Value}; use crate::config::OutputId; @@ -288,7 +289,7 @@ impl TransformConfig for RemapConfig { let dropped_definition = Definition::combine_log_namespaces( input_definition.log_namespaces(), input_definition.clone().with_event_field( - &parse_value_path(log_schema().metadata_key()).expect("valid metadata key"), + log_schema().metadata_key().expect("valid metadata key"), Kind::object(BTreeMap::from([ ("reason".into(), Kind::bytes()), ("message".into(), Kind::bytes()), @@ -451,13 +452,12 @@ where match event { Event::Log(ref mut log) => match log.namespace() { LogNamespace::Legacy => { - log.insert( - ( - PathPrefix::Event, - log_schema().metadata_key().concat(path!("dropped")), - ), - self.dropped_data(reason, error), - ); + if let Some(metadata_key) = log_schema().metadata_key() { + log.insert( + (PathPrefix::Event, metadata_key.concat(path!("dropped"))), + self.dropped_data(reason, error), + ); + } } LogNamespace::Vector => { log.insert( @@ -467,23 +467,29 @@ where } }, Event::Metric(ref mut metric) => { - let m = log_schema().metadata_key(); - metric.replace_tag(format!("{}.dropped.reason", m), reason.into()); - metric.replace_tag( - format!("{}.dropped.component_id", m), - self.component_key - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(String::new), - ); - metric.replace_tag(format!("{}.dropped.component_type", m), "remap".into()); - metric.replace_tag(format!("{}.dropped.component_kind", m), "transform".into()); + if let Some(metadata_key) = log_schema().metadata_key() { + metric.replace_tag(format!("{}.dropped.reason", metadata_key), reason.into()); + metric.replace_tag( + format!("{}.dropped.component_id", metadata_key), + self.component_key + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(String::new), + ); + metric.replace_tag( + format!("{}.dropped.component_type", metadata_key), + "remap".into(), + ); + metric.replace_tag( + format!("{}.dropped.component_kind", metadata_key), + "transform".into(), + ); + } } Event::Trace(ref mut trace) => { - trace.insert( - log_schema().metadata_key(), - self.dropped_data(reason, error), - ); + trace.maybe_insert(PathPrefix::Event, log_schema().metadata_key(), || { + self.dropped_data(reason, error).into() + }); } } }