From 52e3e8bf130f799a5429c8e391d2a4d669c21f5b Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Fri, 14 Jul 2023 12:55:47 -0400 Subject: [PATCH 1/9] feat: Migrate LogSchem::host_key to new lookup code --- lib/vector-core/src/config/log_schema.rs | 16 ++--- lib/vector-core/src/event/log_event.rs | 6 +- .../src/lookup_v2/optional_path.rs | 8 +++ src/config/mod.rs | 10 ++- src/sinks/datadog/metrics/encoder.rs | 9 ++- src/sinks/datadog/traces/sink.rs | 13 ++-- src/sinks/humio/logs.rs | 13 ++-- src/sinks/humio/mod.rs | 2 +- src/sinks/influxdb/logs.rs | 24 ++++--- src/sinks/splunk_hec/common/util.rs | 2 +- src/sources/datadog_agent/metrics.rs | 16 +++-- src/sources/datadog_agent/mod.rs | 6 +- src/sources/datadog_agent/traces.rs | 4 +- src/sources/dnstap/mod.rs | 10 +-- src/sources/docker_logs/mod.rs | 2 +- src/sources/exec/mod.rs | 26 +++++-- src/sources/file.rs | 4 +- src/sources/file_descriptors/mod.rs | 24 +++---- src/sources/fluent/mod.rs | 7 +- src/sources/heroku_logs.rs | 21 ++++-- src/sources/internal_logs.rs | 2 +- src/sources/internal_metrics.rs | 20 +++--- src/sources/journald.rs | 11 +-- src/sources/logstash.rs | 8 +-- src/sources/socket/mod.rs | 2 +- src/sources/socket/udp.rs | 2 +- src/sources/splunk_hec/mod.rs | 69 +++++++++++++------ src/sources/syslog.rs | 24 ++++--- src/transforms/dedupe.rs | 8 +-- src/transforms/metric_to_log.rs | 15 ++-- 30 files changed, 232 insertions(+), 152 deletions(-) diff --git a/lib/vector-core/src/config/log_schema.rs b/lib/vector-core/src/config/log_schema.rs index 9f9eab7cbc519..8b23b1be7b1c6 100644 --- a/lib/vector-core/src/config/log_schema.rs +++ b/lib/vector-core/src/config/log_schema.rs @@ -55,7 +55,7 @@ pub struct LogSchema { /// This field will generally represent a real host, or container, that generated the message, /// but is somewhat source-dependent. #[serde(default = "LogSchema::default_host_key")] - host_key: String, + host_key: OptionalValuePath, /// The name of the event field to set the source identifier in. /// @@ -92,8 +92,8 @@ impl LogSchema { OptionalValuePath::new("timestamp") } - fn default_host_key() -> String { - String::from("host") + fn default_host_key() -> OptionalValuePath { + OptionalValuePath::new("host") } fn default_source_type_key() -> OptionalValuePath { @@ -121,8 +121,8 @@ impl LogSchema { self.timestamp_key.path.as_ref() } - pub fn host_key(&self) -> &str { - &self.host_key + pub fn host_key(&self) -> Option<&OwnedValuePath> { + self.host_key.path.as_ref() } pub fn source_type_key(&self) -> Option<&OwnedValuePath> { @@ -141,8 +141,8 @@ impl LogSchema { self.timestamp_key = OptionalValuePath { path: v }; } - pub fn set_host_key(&mut self, v: String) { - self.host_key = v; + pub fn set_host_key(&mut self, path: Option) { + self.host_key = OptionalValuePath { path }; } pub fn set_source_type_key(&mut self, path: Option) { @@ -169,7 +169,7 @@ impl LogSchema { { errors.push("conflicting values for 'log_schema.host_key' found".to_owned()); } else { - self.set_host_key(other.host_key().to_string()); + self.set_host_key(other.host_key().cloned()); } if self.message_key() != LOG_SCHEMA_DEFAULT.message_key() && self.message_key() != other.message_key() diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index e11423e122d13..41633b32d93d5 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -458,7 +458,7 @@ impl LogEvent { pub fn host_path(&self) -> Option { match self.namespace() { LogNamespace::Vector => self.find_key_by_meaning("host"), - LogNamespace::Legacy => Some(log_schema().host_key().to_owned()), + LogNamespace::Legacy => log_schema().host_key().map(ToString::to_string), } } @@ -505,7 +505,9 @@ impl LogEvent { pub fn get_host(&self) -> Option<&Value> { match self.namespace() { LogNamespace::Vector => self.get_by_meaning("host"), - LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().host_key())), + LogNamespace::Legacy => log_schema() + .host_key() + .and_then(|key| self.get((PathPrefix::Event, key))), } } diff --git a/lib/vector-lookup/src/lookup_v2/optional_path.rs b/lib/vector-lookup/src/lookup_v2/optional_path.rs index ee15ed3509cf6..5bbfe4ad082d4 100644 --- a/lib/vector-lookup/src/lookup_v2/optional_path.rs +++ b/lib/vector-lookup/src/lookup_v2/optional_path.rs @@ -91,3 +91,11 @@ impl From for OptionalValuePath { Self { path: Some(path) } } } + +impl From> for OptionalValuePath { + fn from(value: Option) -> Self { + value.map_or(OptionalValuePath::none(), |path| { + OptionalValuePath::from(path) + }) + } +} diff --git a/src/config/mod.rs b/src/config/mod.rs index 40d0733f15cb0..799e8f6a3c7c3 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -843,7 +843,10 @@ mod tests { ) .unwrap(); - assert_eq!("host", config.global.log_schema.host_key().to_string()); + assert_eq!( + "host", + config.global.log_schema.host_key().unwrap().to_string() + ); assert_eq!( "message", config.global.log_schema.message_key().to_string() @@ -879,7 +882,10 @@ mod tests { ) .unwrap(); - assert_eq!("this", config.global.log_schema.host_key().to_string()); + assert_eq!( + "this", + config.global.log_schema.host_key().unwrap().to_string() + ); assert_eq!("that", config.global.log_schema.message_key().to_string()); assert_eq!( "then", diff --git a/src/sinks/datadog/metrics/encoder.rs b/src/sinks/datadog/metrics/encoder.rs index 0dd6c393e31b5..26e4be8016f1c 100644 --- a/src/sinks/datadog/metrics/encoder.rs +++ b/src/sinks/datadog/metrics/encoder.rs @@ -385,7 +385,9 @@ fn sketch_to_proto_message( let name = get_namespaced_name(metric, default_namespace); let ts = encode_timestamp(metric.timestamp()); let mut tags = metric.tags().cloned().unwrap_or_default(); - let host = tags.remove(log_schema.host_key()).unwrap_or_default(); + let host = tags + .remove(log_schema.host_key().unwrap().to_string().as_str()) + .unwrap_or_default(); let tags = encode_tags(&tags); let cnt = ddsketch.count() as i64; @@ -497,7 +499,10 @@ fn generate_series_metrics( let name = get_namespaced_name(metric, default_namespace); let mut tags = metric.tags().cloned().unwrap_or_default(); - let host = tags.remove(log_schema.host_key()); + let host = log_schema + .host_key() + .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default()); + let source_type_name = tags.remove("source_type_name"); let device = tags.remove("device"); let ts = encode_timestamp(metric.timestamp()); diff --git a/src/sinks/datadog/traces/sink.rs b/src/sinks/datadog/traces/sink.rs index 910e108dfebe0..fa1e2cc3f2280 100644 --- a/src/sinks/datadog/traces/sink.rs +++ b/src/sinks/datadog/traces/sink.rs @@ -7,6 +7,8 @@ use futures_util::{ }; use tokio::sync::oneshot::{channel, Sender}; use tower::Service; +use vrl::path::PathPrefix; + use vector_core::{ config::log_schema, event::Event, @@ -15,11 +17,13 @@ use vector_core::{ stream::{BatcherSettings, DriverResponse}, }; -use super::service::TraceApiRequest; use crate::{ internal_events::DatadogTracesEncodingError, sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt}, }; + +use super::service::TraceApiRequest; + #[derive(Default)] struct EventPartitioner; @@ -51,9 +55,10 @@ impl Partitioner for EventPartitioner { Event::Trace(t) => PartitionKey { api_key: item.metadata().datadog_api_key(), env: t.get("env").map(|s| s.to_string_lossy().into_owned()), - hostname: t - .get(log_schema().host_key()) - .map(|s| s.to_string_lossy().into_owned()), + hostname: log_schema().host_key().and_then(|key| { + t.get((PathPrefix::Event, key)) + .map(|s| s.to_string_lossy().into_owned()) + }), agent_version: t .get("agent_version") .map(|s| s.to_string_lossy().into_owned()), diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index b87446ff015f7..7a4b94a7bfc54 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -231,6 +231,7 @@ mod integration_tests { use serde::Deserialize; use serde_json::{json, Value as JsonValue}; use tokio::time::Duration; + use vrl::path::PathPrefix; use super::*; use crate::{ @@ -262,14 +263,14 @@ mod integration_tests { let message = random_string(100); let host = "192.168.1.1".to_string(); let mut event = LogEvent::from(message.clone()); - event.insert(log_schema().host_key(), host.clone()); + event.insert( + (PathPrefix::Event, log_schema().host_key().unwrap()), + host.clone(), + ); let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456); event.insert( - ( - lookup::PathPrefix::Event, - log_schema().timestamp_key().unwrap(), - ), + (PathPrefix::Event, log_schema().timestamp_key().unwrap()), ts, ); @@ -387,7 +388,7 @@ mod integration_tests { source: None, encoding: JsonSerializerConfig::default().into(), event_type: None, - host_key: log_schema().host_key().to_string(), + host_key: log_schema().host_key().unwrap().to_string(), indexed_fields: vec![], index: None, compression: Compression::None, diff --git a/src/sinks/humio/mod.rs b/src/sinks/humio/mod.rs index 4601bbaef5aae..df62b1118d0a5 100644 --- a/src/sinks/humio/mod.rs +++ b/src/sinks/humio/mod.rs @@ -2,5 +2,5 @@ pub mod logs; pub mod metrics; fn host_key() -> String { - crate::config::log_schema().host_key().to_string() + crate::config::log_schema().host_key().unwrap().to_string() } diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index ca12946f00bad..bc2936a022db5 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -4,12 +4,13 @@ use bytes::{Bytes, BytesMut}; use futures::SinkExt; use http::{Request, Uri}; use indoc::indoc; +use vrl::value::Kind; + use lookup::lookup_v2::{parse_value_path, OptionalValuePath}; use lookup::{OwnedValuePath, PathPrefix}; use vector_config::configurable_component; use vector_core::config::log_schema; use vector_core::schema; -use vrl::value::Kind; use crate::{ codecs::Transformer, @@ -189,10 +190,8 @@ impl SinkConfig for InfluxDbLogsConfig { .host_key .clone() .and_then(|k| k.path) - .unwrap_or_else(|| { - parse_value_path(log_schema().host_key()) - .expect("global log_schema.host_key to be valid path") - }); + .or(log_schema().host_key().cloned()) + .unwrap(); let message_key = self .message_key @@ -409,10 +408,10 @@ mod tests { use futures::{channel::mpsc, stream, StreamExt}; use http::{request::Parts, StatusCode}; use indoc::indoc; + use lookup::owned_value_path; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; - use super::*; use crate::{ sinks::{ influxdb::test_util::{assert_fields, split_line_protocol, ts}, @@ -427,6 +426,8 @@ mod tests { }, }; + use super::*; + type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>; #[test] @@ -880,16 +881,17 @@ mod tests { #[cfg(feature = "influxdb-integration-tests")] #[cfg(test)] mod integration_tests { + use std::sync::Arc; + use chrono::Utc; - use codecs::BytesDeserializerConfig; use futures::stream; + use vrl::value; + + use codecs::BytesDeserializerConfig; use lookup::{owned_value_path, path}; - use std::sync::Arc; use vector_core::config::{LegacyKey, LogNamespace}; use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent}; - use vrl::value; - use super::*; use crate::{ config::SinkContext, sinks::influxdb::{ @@ -900,6 +902,8 @@ mod integration_tests { test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, }; + use super::*; + #[tokio::test] async fn influxdb2_logs_put_data() { let endpoint = address_v2(); diff --git a/src/sinks/splunk_hec/common/util.rs b/src/sinks/splunk_hec/common/util.rs index 64d6bb2dae06b..ce2a251e736e9 100644 --- a/src/sinks/splunk_hec/common/util.rs +++ b/src/sinks/splunk_hec/common/util.rs @@ -133,7 +133,7 @@ pub fn build_uri( } pub fn host_key() -> String { - crate::config::log_schema().host_key().to_string() + crate::config::log_schema().host_key().unwrap().to_string() } pub fn config_timestamp_key() -> OptionalValuePath { diff --git a/src/sources/datadog_agent/metrics.rs b/src/sources/datadog_agent/metrics.rs index 60be1e7cd2791..a391fc50860cf 100644 --- a/src/sources/datadog_agent/metrics.rs +++ b/src/sources/datadog_agent/metrics.rs @@ -5,9 +5,10 @@ use chrono::{TimeZone, Utc}; use http::StatusCode; use prost::Message; use serde::{Deserialize, Serialize}; +use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter}; + use vector_common::internal_event::{CountByteSize, InternalEventHandle as _, Registered}; use vector_core::{metrics::AgentDDSketch, EstimatedJsonEncodedSizeOf}; -use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter}; use crate::{ common::datadog::{DatadogMetricType, DatadogSeriesMetric}, @@ -243,7 +244,7 @@ pub(crate) fn decode_ddseries_v2( // As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L180-L189 // the hostname can be found in MetricSeries::resources and that is the only value stored there. if r.r#type.eq("host") { - tags.replace(log_schema().host_key().to_string(), r.name); + tags.replace(log_schema().host_key().unwrap().to_string(), r.name); } else { // But to avoid losing information if this situation changes, any other resource type/name will be saved in the tags map tags.replace(format!("resource.{}", r.r#type), r.name); @@ -385,9 +386,12 @@ fn into_vector_metric( ) -> Vec { let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default()); - dd_metric - .host - .and_then(|host| tags.replace(log_schema().host_key().to_owned(), host)); + if let Some(key) = log_schema().host_key() { + dd_metric + .host + .and_then(|host| tags.replace(key.to_string(), host)); + } + dd_metric .source_type_name .and_then(|source| tags.replace("source_type_name".into(), source)); @@ -498,7 +502,7 @@ pub(crate) fn decode_ddsketch( // sketch_series.distributions is also always empty from payload coming from dd agents let mut tags = into_metric_tags(sketch_series.tags); tags.replace( - log_schema().host_key().to_string(), + log_schema().host_key().unwrap().to_string(), sketch_series.host.clone(), ); sketch_series.dogsketches.into_iter().map(move |sketch| { diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 18536acf3adac..f089b21844117 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -283,7 +283,7 @@ pub struct ApiKeyQueryParams { #[derive(Clone)] pub(crate) struct DatadogAgentSource { pub(crate) api_key_extractor: ApiKeyExtractor, - pub(crate) log_schema_host_key: &'static str, + pub(crate) log_schema_host_key: String, pub(crate) log_schema_source_type_key: String, pub(crate) log_namespace: LogNamespace, pub(crate) decoder: Decoder, @@ -333,7 +333,9 @@ impl DatadogAgentSource { matcher: Regex::new(r"^/v1/input/(?P[[:alnum:]]{32})/??") .expect("static regex always compiles"), }, - log_schema_host_key: log_schema().host_key(), + log_schema_host_key: log_schema() + .host_key() + .map_or("".to_string(), |key| key.to_string()), log_schema_source_type_key: log_schema() .source_type_key() .map_or("".to_string(), |key| key.to_string()), diff --git a/src/sources/datadog_agent/traces.rs b/src/sources/datadog_agent/traces.rs index 889da554ba9ca..da42dfa23a05e 100644 --- a/src/sources/datadog_agent/traces.rs +++ b/src/sources/datadog_agent/traces.rs @@ -146,7 +146,7 @@ fn handle_dd_trace_payload_v1( Bytes::from("datadog_agent"), ); trace_event.insert("payload_version", "v2".to_string()); - trace_event.insert(source.log_schema_host_key, hostname.clone()); + trace_event.insert(source.log_schema_host_key.as_str(), hostname.clone()); trace_event.insert("env", env.clone()); trace_event.insert("agent_version", agent_version.clone()); trace_event.insert("target_tps", target_tps); @@ -259,7 +259,7 @@ fn handle_dd_trace_payload_v0( Bytes::from("datadog_agent"), ); trace_event.insert("payload_version", "v1".to_string()); - trace_event.insert(source.log_schema_host_key, hostname.clone()); + trace_event.insert(source.log_schema_host_key.as_str(), hostname.clone()); trace_event.insert("env", env.clone()); Event::Trace(trace_event) }) diff --git a/src/sources/dnstap/mod.rs b/src/sources/dnstap/mod.rs index 231c5fd096245..0bd44b4b5a2ea 100644 --- a/src/sources/dnstap/mod.rs +++ b/src/sources/dnstap/mod.rs @@ -22,7 +22,7 @@ pub use parser::{parse_dnstap_data, DnstapParser}; pub mod schema; use dnsmsg_parser::{dns_message, dns_message_parser}; -use lookup::lookup_v2::{parse_value_path, OptionalValuePath}; +use lookup::lookup_v2::OptionalValuePath; pub use schema::DnstapEventSchema; use vector_core::{ config::{LegacyKey, LogNamespace}, @@ -224,10 +224,10 @@ impl DnstapFrameHandler { let schema = DnstapConfig::event_schema(timestamp_key); - let host_key = config.host_key.clone().map_or_else( - || parse_value_path(log_schema().host_key()).ok(), - |k| k.path, - ); + let host_key = config + .host_key + .clone() + .map_or(log_schema().host_key().cloned(), |k| k.path); Self { max_frame_length: config.max_frame_length, diff --git a/src/sources/docker_logs/mod.rs b/src/sources/docker_logs/mod.rs index a37ae3260946c..03afcc4229fd8 100644 --- a/src/sources/docker_logs/mod.rs +++ b/src/sources/docker_logs/mod.rs @@ -188,7 +188,7 @@ impl Default for DockerLogsConfig { } fn default_host_key() -> OptionalValuePath { - OptionalValuePath::from(owned_value_path!(log_schema().host_key())) + log_schema().host_key().cloned().into() } fn default_partial_event_marker_field() -> Option { diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index 627a4f123bbf9..74720f9eeb22f 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -23,6 +23,7 @@ use tokio_util::codec::FramedRead; use vector_common::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol}; use vector_config::configurable_component; use vector_core::{config::LegacyKey, EstimatedJsonEncodedSizeOf}; +use vrl::path::OwnedValuePath; use vrl::value::Kind; use crate::{ @@ -276,9 +277,11 @@ impl SourceConfig for ExecConfig { .with_standard_vector_source_metadata() .with_source_metadata( Self::NAME, - Some(LegacyKey::InsertIfEmpty(owned_value_path!( - log_schema().host_key() - ))), + Some(LegacyKey::InsertIfEmpty( + log_schema() + .host_key() + .map_or(OwnedValuePath::root(), |key| key.clone()), + )), &owned_value_path!("host"), Kind::bytes().or_undefined(), Some("host"), @@ -666,7 +669,7 @@ fn handle_event( log_namespace.insert_source_metadata( ExecConfig::NAME, log, - Some(LegacyKey::InsertIfEmpty(path!(log_schema().host_key()))), + log_schema().host_key().map(LegacyKey::InsertIfEmpty), path!("host"), hostname.clone(), ); @@ -756,7 +759,10 @@ mod tests { ); let log = event.as_log(); - assert_eq!(log[log_schema().host_key()], "Some.Machine".into()); + assert_eq!( + log[log_schema().host_key().unwrap().to_string().as_str()], + "Some.Machine".into() + ); assert_eq!(log[STREAM_KEY], STDOUT.into()); assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); @@ -840,7 +846,10 @@ mod tests { ); let log = event.as_log(); - assert_eq!(log[log_schema().host_key()], "Some.Machine".into()); + assert_eq!( + log[log_schema().host_key().unwrap().to_string().as_str()], + "Some.Machine".into() + ); assert_eq!(log[STREAM_KEY], STDOUT.into()); assert_eq!(log[PID_KEY], (8888_i64).into()); assert_eq!(log[COMMAND_KEY], config.command.into()); @@ -1052,7 +1061,10 @@ mod tests { "exec".into() ); assert_eq!(log[log_schema().message_key()], "Hello World!".into()); - assert_eq!(log[log_schema().host_key()], "Some.Machine".into()); + assert_eq!( + log[log_schema().host_key().unwrap().to_string().as_str()], + "Some.Machine".into() + ); assert!(log.get(PID_KEY).is_some()); assert!(log .get(( diff --git a/src/sources/file.rs b/src/sources/file.rs index 230e8aac42ffb..2f1b3419e2c5d 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -253,7 +253,7 @@ fn default_file_key() -> OptionalValuePath { } fn default_host_key() -> OptionalValuePath { - OptionalValuePath::from(owned_value_path!(log_schema().host_key())) + log_schema().host_key().cloned().into() } const fn default_read_from() -> ReadFromConfig { @@ -1469,7 +1469,7 @@ mod tests { .path .expect("file key to exist") .to_string(), - log_schema().host_key().to_string(), + log_schema().host_key().unwrap().to_string(), log_schema().message_key().to_string(), log_schema().timestamp_key().unwrap().to_string(), log_schema().source_type_key().unwrap().to_string() diff --git a/src/sources/file_descriptors/mod.rs b/src/sources/file_descriptors/mod.rs index 04189d33e871c..3f7286b3eb38f 100644 --- a/src/sources/file_descriptors/mod.rs +++ b/src/sources/file_descriptors/mod.rs @@ -8,10 +8,7 @@ use codecs::{ StreamDecodingError, }; use futures::{channel::mpsc, executor, SinkExt, StreamExt}; -use lookup::{ - lookup_v2::{parse_value_path, OptionalValuePath}, - owned_value_path, path, OwnedValuePath, -}; +use lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath}; use tokio_util::{codec::FramedRead, io::StreamReader}; use vector_common::internal_event::{ ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, @@ -53,10 +50,10 @@ pub trait FileDescriptorConfig: NamedComponent { where R: Send + io::BufRead + 'static, { - let host_key = self.host_key().map_or_else( - || parse_value_path(log_schema().host_key()).ok(), - |k| k.path, - ); + let host_key = self + .host_key() + .and_then(|k| k.path) + .or(log_schema().host_key().cloned()); let hostname = crate::get_hostname().ok(); let description = self.description(); @@ -211,17 +208,14 @@ fn outputs( decoding: &DeserializerConfig, source_name: &'static str, ) -> Vec { - let legacy_host_key = Some(LegacyKey::InsertIfEmpty( - host_key.clone().and_then(|k| k.path).unwrap_or_else(|| { - parse_value_path(log_schema().host_key()).expect("log_schema.host_key to be valid path") - }), - )); - let schema_definition = decoding .schema_definition(log_namespace) .with_source_metadata( source_name, - legacy_host_key, + host_key + .clone() + .map_or(log_schema().host_key().cloned(), |key| key.path) + .map(LegacyKey::Overwrite), &owned_value_path!("host"), Kind::bytes(), Some("host"), diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index e6fbb45b31d75..1980fd514ae1f 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -135,8 +135,9 @@ impl FluentConfig { /// Builds the `schema::Definition` for this source using the provided `LogNamespace`. fn schema_definition(&self, log_namespace: LogNamespace) -> Definition { // `host_key` is only inserted if not present already. - let host_key = parse_value_path(log_schema().host_key()) - .ok() + let host_key = log_schema() + .host_key() + .cloned() .map(LegacyKey::InsertIfEmpty); let tag_key = parse_value_path("tag").ok().map(LegacyKey::Overwrite); @@ -209,7 +210,7 @@ impl FluentSource { fn new(log_namespace: LogNamespace) -> Self { Self { log_namespace, - legacy_host_key_path: parse_value_path(log_schema().host_key()).ok(), + legacy_host_key_path: log_schema().host_key().cloned(), } } } diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index c2a6b7959ac44..b08a311a5364a 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -95,9 +95,10 @@ impl LogplexConfig { ) .with_source_metadata( LogplexConfig::NAME, - Some(LegacyKey::InsertIfEmpty(owned_value_path!( - log_schema().host_key() - ))), + log_schema() + .host_key() + .cloned() + .map(LegacyKey::InsertIfEmpty), &owned_value_path!("host"), Kind::bytes(), Some("host"), @@ -324,7 +325,7 @@ fn line_to_events( let mut buffer = BytesMut::new(); buffer.put(message.as_bytes()); - let legacy_host_key = parse_value_path(log_schema().host_key()).ok(); + let legacy_host_key = log_schema().host_key().cloned(); let legacy_app_key = parse_value_path("app_name").ok(); let legacy_proc_key = parse_value_path("proc_id").ok(); @@ -530,7 +531,7 @@ mod tests { .unwrap() .into() ); - assert_eq!(log[&log_schema().host_key()], "host".into()); + assert_eq!(log[log_schema().host_key().unwrap().to_string()], "host".into()); assert_eq!(log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into()); assert_eq!(log["appname"], "lumberjack-store".into()); assert_eq!(log["absent"], Value::Null); @@ -613,7 +614,10 @@ mod tests { .unwrap() .into() ); - assert_eq!(log[log_schema().host_key()], "host".into()); + assert_eq!( + log[log_schema().host_key().unwrap().to_string().as_str()], + "host".into() + ); assert_eq!( log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into() @@ -658,7 +662,10 @@ mod tests { .unwrap() .into() ); - assert_eq!(log[log_schema().host_key()], "host".into()); + assert_eq!( + log[log_schema().host_key().unwrap().to_string().as_str()], + "host".into() + ); assert_eq!( log[log_schema().source_type_key().unwrap().to_string()], "heroku_logs".into() diff --git a/src/sources/internal_logs.rs b/src/sources/internal_logs.rs index d74a97788a3b5..1034a37b508ef 100644 --- a/src/sources/internal_logs.rs +++ b/src/sources/internal_logs.rs @@ -53,7 +53,7 @@ pub struct InternalLogsConfig { } fn default_host_key() -> OptionalValuePath { - OptionalValuePath::from(owned_value_path!(log_schema().host_key())) + log_schema().host_key().cloned().into() } fn default_pid_key() -> OptionalValuePath { diff --git a/src/sources/internal_metrics.rs b/src/sources/internal_metrics.rs index 2b876aeaa8397..ddf09afb095cc 100644 --- a/src/sources/internal_metrics.rs +++ b/src/sources/internal_metrics.rs @@ -1,6 +1,7 @@ use std::time::Duration; use futures::StreamExt; +use lookup::lookup_v2::OptionalValuePath; use serde_with::serde_as; use tokio::time; use tokio_stream::wrappers::IntervalStream; @@ -64,7 +65,7 @@ pub struct TagsConfig { /// /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key #[serde(default = "default_host_key")] - pub host_key: String, + pub host_key: OptionalValuePath, /// Sets the name of the tag to use to add the current process ID to each metric. /// @@ -91,8 +92,8 @@ fn default_namespace() -> String { "vector".to_owned() } -fn default_host_key() -> String { - log_schema().host_key().to_owned() +fn default_host_key() -> OptionalValuePath { + log_schema().host_key().cloned().into() } impl_generate_config_from_default!(InternalMetricsConfig); @@ -111,10 +112,7 @@ impl SourceConfig for InternalMetricsConfig { // namespace for created metrics is already "vector" by default. let namespace = self.namespace.clone(); - let host_key = match self.tags.host_key.as_str() { - "" => None, - s => Some(s.to_owned()), - }; + let host_key = self.tags.host_key.clone(); let pid_key = self .tags @@ -147,7 +145,7 @@ impl SourceConfig for InternalMetricsConfig { struct InternalMetrics<'a> { namespace: String, - host_key: Option, + host_key: OptionalValuePath, pid_key: Option, controller: &'a Controller, interval: time::Duration, @@ -179,9 +177,9 @@ impl<'a> InternalMetrics<'a> { metric = metric.with_namespace(Some(self.namespace.clone())); } - if let Some(host_key) = &self.host_key { + if let Some(host_key) = &self.host_key.path { if let Ok(hostname) = &hostname { - metric.replace_tag(host_key.to_owned(), hostname.to_owned()); + metric.replace_tag(host_key.to_string(), hostname.to_owned()); } } if let Some(pid_key) = &self.pid_key { @@ -317,7 +315,7 @@ mod tests { async fn sets_tags() { let event = event_from_config(InternalMetricsConfig { tags: TagsConfig { - host_key: String::from("my_host_key"), + host_key: OptionalValuePath::new("my_host_key"), pid_key: Some(String::from("my_pid_key")), }, ..Default::default() diff --git a/src/sources/journald.rs b/src/sources/journald.rs index c28faf4318e24..20c55b32d86e5 100644 --- a/src/sources/journald.rs +++ b/src/sources/journald.rs @@ -12,7 +12,7 @@ use bytes::Bytes; use chrono::{TimeZone, Utc}; use codecs::{decoding::BoxedFramingError, CharacterDelimitedDecoder}; use futures::{poll, stream::BoxStream, task::Poll, StreamExt}; -use lookup::{lookup_v2::parse_value_path, metadata_path, owned_value_path, path, PathPrefix}; +use lookup::{metadata_path, owned_value_path, path, PathPrefix}; use nix::{ sys::signal::{kill, Signal}, unistd::Pid, @@ -270,9 +270,7 @@ impl JournaldConfig { ) .with_source_metadata( JournaldConfig::NAME, - parse_value_path(log_schema().host_key()) - .ok() - .map(LegacyKey::Overwrite), + log_schema().host_key().cloned().map(LegacyKey::Overwrite), &owned_value_path!("host"), Kind::bytes().or_undefined(), Some("host"), @@ -709,10 +707,7 @@ fn enrich_log_event(log: &mut LogEvent, log_namespace: LogNamespace) { log_namespace.insert_source_metadata( JournaldConfig::NAME, log, - parse_value_path(log_schema().host_key()) - .ok() - .as_ref() - .map(LegacyKey::Overwrite), + log_schema().host_key().map(LegacyKey::Overwrite), path!("host"), host, ); diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index 2cbabbfacbc4c..1230884a4d951 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -9,7 +9,6 @@ use std::{ use bytes::{Buf, Bytes, BytesMut}; use codecs::{BytesDeserializerConfig, StreamDecodingError}; use flate2::read::ZlibDecoder; -use lookup::lookup_v2::parse_value_path; use lookup::{event_path, metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix}; use smallvec::{smallvec, SmallVec}; use snafu::{ResultExt, Snafu}; @@ -74,8 +73,9 @@ impl LogstashConfig { /// Builds the `schema::Definition` for this source using the provided `LogNamespace`. fn schema_definition(&self, log_namespace: LogNamespace) -> Definition { // `host_key` is only inserted if not present already. - let host_key = parse_value_path(log_schema().host_key()) - .ok() + let host_key = log_schema() + .host_key() + .cloned() .map(LegacyKey::InsertIfEmpty); let tls_client_metadata_path = self @@ -139,7 +139,7 @@ impl SourceConfig for LogstashConfig { let log_namespace = cx.log_namespace(self.log_namespace); let source = LogstashSource { timestamp_converter: types::Conversion::Timestamp(cx.globals.timezone()), - legacy_host_key_path: parse_value_path(log_schema().host_key()).ok(), + legacy_host_key_path: log_schema().host_key().cloned(), log_namespace, }; let shutdown_secs = Duration::from_secs(30); diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 910bf6617a697..4af7b22becd4a 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -315,7 +315,7 @@ impl SourceConfig for SocketConfig { } pub(crate) fn default_host_key() -> OptionalValuePath { - OptionalValuePath::from(owned_value_path!(log_schema().host_key())) + log_schema().host_key().cloned().into() } #[cfg(test)] diff --git a/src/sources/socket/udp.rs b/src/sources/socket/udp.rs index 1951cc9ff01ff..a2793a6bae0cd 100644 --- a/src/sources/socket/udp.rs +++ b/src/sources/socket/udp.rs @@ -89,7 +89,7 @@ pub struct UdpConfig { } fn default_host_key() -> OptionalValuePath { - OptionalValuePath::from(owned_value_path!(log_schema().host_key())) + log_schema().host_key().cloned().into() } fn default_port_key() -> OptionalValuePath { diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index fa0a24564754e..6360de2c82650 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -10,6 +10,7 @@ use chrono::{DateTime, TimeZone, Utc}; use flate2::read::MultiGzDecoder; use futures::FutureExt; use http::StatusCode; +use lookup::lookup_v2::OptionalValuePath; use lookup::{event_path, owned_value_path}; use serde::Serialize; use serde_json::{de::Read as JsonRead, Deserializer, Value as JsonValue}; @@ -200,9 +201,10 @@ impl SourceConfig for SplunkConfig { .with_standard_vector_source_metadata() .with_source_metadata( SplunkConfig::NAME, - Some(LegacyKey::Overwrite(owned_value_path!( - log_schema().host_key() - ))), + log_schema() + .host_key() + .cloned() + .map(LegacyKey::InsertIfEmpty), &owned_value_path!("host"), Kind::bytes(), Some("host"), @@ -635,15 +637,19 @@ impl<'de, R: JsonRead<'de>> EventIterator<'de, R> { // 3. Use the `remote`: SocketAddr value provided by warp DefaultExtractor::new_with( "host", - log_schema().host_key(), + log_schema().host_key().cloned().into(), remote_addr .or_else(|| remote.map(|addr| addr.to_string())) .map(Value::from), log_namespace, ), - DefaultExtractor::new("index", INDEX, log_namespace), - DefaultExtractor::new("source", SOURCE, log_namespace), - DefaultExtractor::new("sourcetype", SOURCETYPE, log_namespace), + DefaultExtractor::new("index", OptionalValuePath::new(INDEX), log_namespace), + DefaultExtractor::new("source", OptionalValuePath::new(SOURCE), log_namespace), + DefaultExtractor::new( + "sourcetype", + OptionalValuePath::new(SOURCETYPE), + log_namespace, + ), ], batch, token, @@ -891,13 +897,17 @@ fn parse_timestamp(t: i64) -> Option> { /// Maintains last known extracted value of field and uses it in the absence of field. struct DefaultExtractor { field: &'static str, - to_field: &'static str, + to_field: OptionalValuePath, value: Option, log_namespace: LogNamespace, } impl DefaultExtractor { - const fn new(field: &'static str, to_field: &'static str, log_namespace: LogNamespace) -> Self { + const fn new( + field: &'static str, + to_field: OptionalValuePath, + log_namespace: LogNamespace, + ) -> Self { DefaultExtractor { field, to_field, @@ -908,7 +918,7 @@ impl DefaultExtractor { fn new_with( field: &'static str, - to_field: &'static str, + to_field: OptionalValuePath, value: impl Into>, log_namespace: LogNamespace, ) -> Self { @@ -928,13 +938,15 @@ impl DefaultExtractor { // Add data field if let Some(index) = self.value.as_ref() { - self.log_namespace.insert_source_metadata( - SplunkConfig::NAME, - log, - Some(LegacyKey::Overwrite(lookup::path!(self.to_field))), - lookup::path!(self.to_field), - index.clone(), - ) + if let Some(metadata_key) = self.to_field.path.as_ref() { + self.log_namespace.insert_source_metadata( + SplunkConfig::NAME, + log, + Some(LegacyKey::Overwrite(metadata_key)), + &self.to_field.path.clone().unwrap_or(owned_value_path!("")), + index.clone(), + ) + } } } } @@ -1009,7 +1021,7 @@ fn raw_event( log_namespace.insert_source_metadata( SplunkConfig::NAME, &mut log, - Some(LegacyKey::Overwrite(log_schema().host_key())), + log_schema().host_key().map(LegacyKey::InsertIfEmpty), "host", host, ); @@ -1196,6 +1208,7 @@ mod tests { use serde::Deserialize; use vector_common::sensitive_string::SensitiveString; use vector_core::{event::EventStatus, schema::Definition}; + use vrl::path::PathPrefix; use super::*; use crate::sinks::splunk_hec::common::config_timestamp_key; @@ -1707,7 +1720,10 @@ mod tests { ); let event = collect_n(source, 1).await.remove(0); - assert_eq!(event.as_log()[log_schema().host_key()], "10.0.0.1".into()); + assert_eq!( + event.as_log()[log_schema().host_key().unwrap().to_string().as_str()], + "10.0.0.1".into() + ); }) .await; } @@ -1730,7 +1746,10 @@ mod tests { ); let event = collect_n(source, 1).await.remove(0); - assert_eq!(event.as_log()[log_schema().host_key()], "10.1.0.2".into()); + assert_eq!( + event.as_log()[log_schema().host_key().unwrap().to_string().as_str()], + "10.1.0.2".into() + ); }) .await; } @@ -1753,7 +1772,10 @@ mod tests { ); let event = collect_n(source, 1).await.remove(0); - assert_eq!(event.as_log()[log_schema().host_key()], "10.0.0.1".into()); + assert_eq!( + event.as_log()[log_schema().host_key().unwrap().to_string().as_str()], + "10.0.0.1".into() + ); }) .await; } @@ -2156,7 +2178,10 @@ mod tests { let event = channel_n(vec![message], sink, source).await.remove(0); assert_eq!(event.as_log()[log_schema().message_key()], message.into()); - assert!(event.as_log().get(log_schema().host_key()).is_none()); + assert!(event + .as_log() + .get((PathPrefix::Event, log_schema().host_key().unwrap())) + .is_none()); }) .await; } diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 69b12a7ee3368..0d57a77903020 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -10,10 +10,7 @@ use codecs::{ }; use futures::StreamExt; use listenfd::ListenFd; -use lookup::{ - lookup_v2::{parse_value_path, OptionalValuePath}, - path, OwnedValuePath, PathPrefix, -}; +use lookup::{lookup_v2::OptionalValuePath, path, OwnedValuePath, PathPrefix}; use smallvec::SmallVec; use tokio_util::udp::UdpFramed; use vector_config::configurable_component; @@ -163,10 +160,11 @@ impl GenerateConfig for SyslogConfig { impl SourceConfig for SyslogConfig { async fn build(&self, cx: SourceContext) -> crate::Result { let log_namespace = cx.log_namespace(self.log_namespace); - let host_key = self.host_key.clone().map_or_else( - || parse_value_path(log_schema().host_key()).ok(), - |k| k.path, - ); + let host_key = self + .host_key + .clone() + .and_then(|k| k.path) + .or(log_schema().host_key().cloned()); match self.mode.clone() { Mode::Tcp { @@ -864,7 +862,10 @@ mod test { .single() .expect("invalid timestamp"), ); - expected.insert(log_schema().host_key(), "74794bfb6795"); + expected.insert( + log_schema().host_key().unwrap().to_string().as_str(), + "74794bfb6795", + ); expected.insert("hostname", "74794bfb6795"); expected.insert( (PathPrefix::Event, log_schema().source_type_key().unwrap()), @@ -1008,7 +1009,10 @@ mod test { ), expected_date, ); - expected.insert(log_schema().host_key(), "74794bfb6795"); + expected.insert( + log_schema().host_key().unwrap().to_string().as_str(), + "74794bfb6795", + ); expected.insert( (PathPrefix::Event, log_schema().source_type_key().unwrap()), "syslog", diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index 513a91ce9115e..ae7f94604ea6b 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -103,10 +103,10 @@ fn default_cache_config() -> CacheConfig { // structure can vary significantly. This should probably either become a required field // in the future, or maybe the "semantic meaning" can be utilized here. fn default_match_fields() -> Vec { - let mut fields = vec![ - log_schema().host_key().into(), - log_schema().message_key().into(), - ]; + let mut fields = vec![log_schema().message_key().into()]; + if let Some(host_key) = log_schema().host_key() { + fields.push(host_key.to_string()); + } if let Some(timestamp_key) = log_schema().timestamp_key() { fields.push(timestamp_key.to_string()); } diff --git a/src/transforms/metric_to_log.rs b/src/transforms/metric_to_log.rs index c1cc98594fb8b..c28d4f1a7387a 100644 --- a/src/transforms/metric_to_log.rs +++ b/src/transforms/metric_to_log.rs @@ -1,6 +1,5 @@ use chrono::Utc; use codecs::MetricTagValues; -use lookup::lookup_v2::parse_value_path; use lookup::{event_path, owned_value_path, path, PathPrefix}; use serde_json::Value; use std::collections::{BTreeMap, BTreeSet}; @@ -242,7 +241,7 @@ fn schema_definition(log_namespace: LogNamespace) -> Definition { } schema_definition = schema_definition.with_event_field( - &parse_value_path(log_schema().host_key()).expect("valid host key"), + log_schema().host_key().unwrap(), Kind::bytes().or_undefined(), None, ); @@ -269,7 +268,13 @@ impl MetricToLog { Self { host_tag: format!( "tags.{}", - host_tag.unwrap_or_else(|| log_schema().host_key()) + host_tag.unwrap_or( + log_schema() + .host_key() + .map(ToString::to_string) + .unwrap_or_default() + .as_str() + ) ), timezone, log_namespace, @@ -311,7 +316,9 @@ impl MetricToLog { } if let Some(host) = log.remove_prune(self.host_tag.as_str(), true) { - log.insert(log_schema().host_key(), host); + if let Some(host_key) = log_schema().host_key() { + log.insert((PathPrefix::Event, host_key), host); + } } } if self.log_namespace == LogNamespace::Vector { From 18d474bccb47134330028de6f2f99dc4c833aa42 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 17 Jul 2023 10:57:56 -0400 Subject: [PATCH 2/9] use expect --- src/sinks/influxdb/logs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index bc2936a022db5..78ed0155367ba 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -191,7 +191,7 @@ impl SinkConfig for InfluxDbLogsConfig { .clone() .and_then(|k| k.path) .or(log_schema().host_key().cloned()) - .unwrap(); + .expect("global log_schema.host_key to be valid path"); let message_key = self .message_key From b8b81f3b13ffc746d75b2a5cb632afb5ba95e894 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 17 Jul 2023 13:19:29 -0400 Subject: [PATCH 3/9] Update src/sinks/humio/mod.rs Co-authored-by: Stephen Wakely --- src/sinks/humio/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/humio/mod.rs b/src/sinks/humio/mod.rs index df62b1118d0a5..edfd800b4ecdb 100644 --- a/src/sinks/humio/mod.rs +++ b/src/sinks/humio/mod.rs @@ -2,5 +2,5 @@ pub mod logs; pub mod metrics; fn host_key() -> String { - crate::config::log_schema().host_key().unwrap().to_string() + crate::config::log_schema().host_key().unwrap_or_default().to_string() } From 4873b450c32859215d0c48ad6d4d6ac48f46158a Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 17 Jul 2023 13:19:42 -0400 Subject: [PATCH 4/9] Update src/sinks/splunk_hec/common/util.rs Co-authored-by: Stephen Wakely --- src/sinks/splunk_hec/common/util.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sinks/splunk_hec/common/util.rs b/src/sinks/splunk_hec/common/util.rs index ce2a251e736e9..787365af5b4f9 100644 --- a/src/sinks/splunk_hec/common/util.rs +++ b/src/sinks/splunk_hec/common/util.rs @@ -133,7 +133,7 @@ pub fn build_uri( } pub fn host_key() -> String { - crate::config::log_schema().host_key().unwrap().to_string() + crate::config::log_schema().host_key().unwrap_or_default().to_string() } pub fn config_timestamp_key() -> OptionalValuePath { From 52eeb1c5b15d528bd2bd56cbf8b74cb5715aee13 Mon Sep 17 00:00:00 2001 From: Pavlos Rontidis Date: Mon, 17 Jul 2023 13:38:17 -0400 Subject: [PATCH 5/9] Replaced many `String`s with `OptionalValuePath`. This eliminates some unwraps. --- src/sinks/humio/logs.rs | 2 +- src/sinks/humio/metrics.rs | 2 +- src/sinks/humio/mod.rs | 8 ++++++-- src/sinks/splunk_hec/common/util.rs | 6 ++++-- src/sinks/splunk_hec/logs/config.rs | 10 ++++----- src/sinks/splunk_hec/logs/sink.rs | 20 ++++++++++-------- src/sinks/splunk_hec/logs/tests.rs | 7 +++++-- src/sinks/splunk_hec/metrics/config.rs | 11 +++++----- src/sinks/splunk_hec/metrics/sink.rs | 11 +++++----- src/sinks/splunk_hec/metrics/tests.rs | 8 +++++--- src/sources/datadog_agent/metrics.rs | 12 ++++++----- src/sources/splunk_hec/mod.rs | 4 ++-- src/transforms/metric_to_log.rs | 28 ++++++++++++++------------ 13 files changed, 75 insertions(+), 54 deletions(-) diff --git a/src/sinks/humio/logs.rs b/src/sinks/humio/logs.rs index 7a4b94a7bfc54..062d017596ff2 100644 --- a/src/sinks/humio/logs.rs +++ b/src/sinks/humio/logs.rs @@ -75,7 +75,7 @@ pub struct HumioLogsConfig { /// /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key #[serde(default = "host_key")] - pub(super) host_key: String, + pub(super) host_key: OptionalValuePath, /// Event fields to be added to Humio’s extra fields. /// diff --git a/src/sinks/humio/metrics.rs b/src/sinks/humio/metrics.rs index a336f60590aa9..f45a9e995ffbc 100644 --- a/src/sinks/humio/metrics.rs +++ b/src/sinks/humio/metrics.rs @@ -87,7 +87,7 @@ pub struct HumioMetricsConfig { /// /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key #[serde(default = "host_key")] - host_key: String, + host_key: OptionalValuePath, /// Event fields to be added to Humio’s extra fields. /// diff --git a/src/sinks/humio/mod.rs b/src/sinks/humio/mod.rs index edfd800b4ecdb..7b84dec955fa9 100644 --- a/src/sinks/humio/mod.rs +++ b/src/sinks/humio/mod.rs @@ -1,6 +1,10 @@ +use lookup::lookup_v2::OptionalValuePath; + pub mod logs; pub mod metrics; -fn host_key() -> String { - crate::config::log_schema().host_key().unwrap_or_default().to_string() +pub fn host_key() -> OptionalValuePath { + OptionalValuePath { + path: crate::config::log_schema().host_key().cloned(), + } } diff --git a/src/sinks/splunk_hec/common/util.rs b/src/sinks/splunk_hec/common/util.rs index 787365af5b4f9..afdb097b49304 100644 --- a/src/sinks/splunk_hec/common/util.rs +++ b/src/sinks/splunk_hec/common/util.rs @@ -132,8 +132,10 @@ pub fn build_uri( uri.parse::() } -pub fn host_key() -> String { - crate::config::log_schema().host_key().unwrap_or_default().to_string() +pub fn config_host_key() -> OptionalValuePath { + OptionalValuePath { + path: crate::config::log_schema().host_key().cloned(), + } } pub fn config_timestamp_key() -> OptionalValuePath { diff --git a/src/sinks/splunk_hec/logs/config.rs b/src/sinks/splunk_hec/logs/config.rs index 64299669e388b..a205f1a0110de 100644 --- a/src/sinks/splunk_hec/logs/config.rs +++ b/src/sinks/splunk_hec/logs/config.rs @@ -17,7 +17,7 @@ use crate::{ sinks::{ splunk_hec::common::{ acknowledgements::HecClientAcknowledgementsConfig, - build_healthcheck, build_http_batch_service, create_client, host_key, + build_healthcheck, build_http_batch_service, config_host_key, create_client, service::{HecService, HttpRequestBuilder}, EndpointTarget, SplunkHecDefaultBatchSettings, }, @@ -64,8 +64,8 @@ pub struct HecLogsSinkConfig { /// /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key #[configurable(metadata(docs::advanced))] - #[serde(default = "host_key")] - pub host_key: String, + #[serde(default = "config_host_key")] + pub host_key: OptionalValuePath, /// Fields to be [added to Splunk index][splunk_field_index_docs]. /// @@ -165,7 +165,7 @@ impl GenerateConfig for HecLogsSinkConfig { toml::Value::try_from(Self { default_token: "${VECTOR_SPLUNK_HEC_TOKEN}".to_owned().into(), endpoint: "endpoint".to_owned(), - host_key: host_key(), + host_key: config_host_key(), indexed_fields: vec![], index: None, sourcetype: None, @@ -273,7 +273,7 @@ impl HecLogsSinkConfig { source: self.source.clone(), index: self.index.clone(), indexed_fields: self.indexed_fields.clone(), - host: self.host_key.clone(), + host_key: self.host_key.path.clone(), timestamp_nanos_key: self.timestamp_nanos_key.clone(), timestamp_key: self.timestamp_key.path.clone(), endpoint_target: self.endpoint_target, diff --git a/src/sinks/splunk_hec/logs/sink.rs b/src/sinks/splunk_hec/logs/sink.rs index 11d3f260ed022..595f8f2119dce 100644 --- a/src/sinks/splunk_hec/logs/sink.rs +++ b/src/sinks/splunk_hec/logs/sink.rs @@ -39,7 +39,7 @@ pub struct HecLogsSink { pub source: Option