Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/opentelemetry-proto/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl ResourceLog {

log_namespace.insert_vector_metadata(
&mut log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(SOURCE_NAME.as_bytes()),
);
Expand Down
25 changes: 12 additions & 13 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use lookup::lookup_v2::{parse_target_path, OptionalValuePath};
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath};
use lookup::lookup_v2::OptionalValuePath;
use lookup::{OwnedTargetPath, OwnedValuePath};
use once_cell::sync::{Lazy, OnceCell};
use vector_config::configurable_component;
use vrl::path::parse_target_path;

static LOG_SCHEMA: OnceCell<LogSchema> = OnceCell::new();
static LOG_SCHEMA_DEFAULT: Lazy<LogSchema> = Lazy::new(LogSchema::default);
Expand Down Expand Up @@ -60,7 +61,7 @@ pub struct LogSchema {
///
/// This field will be set by the Vector source that the event was created in.
#[serde(default = "LogSchema::default_source_type_key")]
source_type_key: String,
source_type_key: OptionalValuePath,

/// The name of the event field to set the event metadata in.
///
Expand Down Expand Up @@ -88,17 +89,15 @@ impl LogSchema {
}

fn default_timestamp_key() -> OptionalValuePath {
OptionalValuePath {
path: Some(owned_value_path!("timestamp")),
}
OptionalValuePath::new("timestamp")
}

fn default_host_key() -> String {
String::from("host")
}

fn default_source_type_key() -> String {
String::from("source_type")
fn default_source_type_key() -> OptionalValuePath {
OptionalValuePath::new("source_type")
}

fn default_metadata_key() -> String {
Expand Down Expand Up @@ -126,8 +125,8 @@ impl LogSchema {
&self.host_key
}

pub fn source_type_key(&self) -> &str {
&self.source_type_key
pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
self.source_type_key.path.as_ref()
}

pub fn metadata_key(&self) -> &str {
Expand All @@ -146,8 +145,8 @@ impl LogSchema {
self.host_key = v;
}

pub fn set_source_type_key(&mut self, v: String) {
self.source_type_key = v;
pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
self.source_type_key = OptionalValuePath { path };
}

pub fn set_metadata_key(&mut self, v: String) {
Expand Down Expand Up @@ -191,7 +190,7 @@ impl LogSchema {
{
errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
} else {
self.set_source_type_key(other.source_type_key().to_string());
self.set_source_type_key(other.source_type_key().cloned());
}
if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
&& self.metadata_key() != other.metadata_key()
Expand Down
7 changes: 4 additions & 3 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ impl LogNamespace {
) {
self.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(source_name.as_bytes()),
);
Expand Down Expand Up @@ -551,14 +551,15 @@ mod test {
use chrono::Utc;
use lookup::{event_path, owned_value_path, OwnedTargetPath};
use vector_common::btreemap;
use vrl::path::OwnedValuePath;
use vrl::value::Kind;

#[test]
fn test_insert_standard_vector_source_metadata() {
let nested_path = "a.b.c.d";
let nested_path = "a.b.c.d".to_string();

let mut schema = LogSchema::default();
schema.set_source_type_key(nested_path.to_owned());
schema.set_source_type_key(Some(OwnedValuePath::try_from(nested_path).unwrap()));
init_log_schema(schema, false);

let namespace = LogNamespace::Legacy;
Expand Down
10 changes: 6 additions & 4 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,10 @@ impl LogEvent {
/// or from the `source_type` key set on the "Global Log Schema" (Legacy namespace).
// TODO: This can eventually return a `&TargetOwnedPath` once Semantic meaning and the
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will tackle this in a follow-up PR.

// "Global Log Schema" are updated to the new path lookup code
pub fn source_type_path(&self) -> &'static str {
pub fn source_type_path(&self) -> Option<String> {
match self.namespace() {
LogNamespace::Vector => "%vector.source_type",
LogNamespace::Legacy => log_schema().source_type_key(),
LogNamespace::Vector => Some("%vector.source_type".to_string()),
LogNamespace::Legacy => log_schema().source_type_key().map(ToString::to_string),
}
}

Expand Down Expand Up @@ -514,7 +514,9 @@ impl LogEvent {
pub fn get_source_type(&self) -> Option<&Value> {
match self.namespace() {
LogNamespace::Vector => self.get(metadata_path!("vector", "source_type")),
LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().source_type_key())),
LogNamespace::Legacy => log_schema()
.source_type_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions lib/vector-core/src/schema/definition.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};

use crate::config::{log_schema, LegacyKey, LogNamespace};
use lookup::lookup_v2::{parse_value_path, TargetPath};
use lookup::lookup_v2::TargetPath;
use lookup::{owned_value_path, OwnedTargetPath, OwnedValuePath, PathPrefix};
use vrl::value::{kind::Collection, Kind};

Expand Down Expand Up @@ -144,9 +144,7 @@ impl Definition {
#[must_use]
pub fn with_standard_vector_source_metadata(self) -> Self {
self.with_vector_metadata(
parse_value_path(log_schema().source_type_key())
.ok()
.as_ref(),
log_schema().source_type_key(),
&owned_value_path!("source_type"),
Kind::bytes(),
None,
Expand Down
7 changes: 7 additions & 0 deletions lib/vector-lookup/src/lookup_v2/optional_path.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use vector_config::configurable_component;
use vrl::owned_value_path;

use crate::lookup_v2::PathParseError;
use crate::{OwnedTargetPath, OwnedValuePath};
Expand Down Expand Up @@ -56,6 +57,12 @@ impl OptionalValuePath {
pub fn none() -> Self {
Self { path: None }
}

pub fn new(path: &str) -> Self {
Self {
path: Some(owned_value_path!(path)),
}
}
}

impl TryFrom<String> for OptionalValuePath {
Expand Down
4 changes: 3 additions & 1 deletion src/sinks/datadog/events/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ async fn ensure_required_fields(event: Event) -> Option<Event> {
}

if !log.contains("source_type_name") {
log.rename_key(log.source_type_path(), "source_type_name")
if let Some(source_type_path) = log.source_type_path() {
log.rename_key(source_type_path.as_str(), "source_type_name")
}
}

Some(Event::from(log))
Expand Down
20 changes: 11 additions & 9 deletions src/sinks/influxdb/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,8 @@ impl SinkConfig for InfluxDbLogsConfig {
.source_type_key
.clone()
.and_then(|k| k.path)
.unwrap_or_else(|| {
parse_value_path(log_schema().source_type_key())
.expect("global log_schema.source_type_key to be valid path")
});
.or(log_schema().source_type_key().cloned())
.unwrap();

let sink = InfluxDbLogsSink {
uri,
Expand Down Expand Up @@ -280,11 +278,15 @@ impl HttpEventEncoder<BytesMut> for InfluxDbLogsEncoder {
self.tags.replace(host_path.clone());
log.rename_key(host_path.as_str(), (PathPrefix::Event, &self.host_key));
}
self.tags.replace(log.source_type_path().to_string());
log.rename_key(
log.source_type_path(),
(PathPrefix::Event, &self.source_type_key),
);

if let Some(source_type_path) = log.source_type_path() {
self.tags.replace(source_type_path.clone());
log.rename_key(
source_type_path.as_str(),
(PathPrefix::Event, &self.source_type_key),
);
}

self.tags.replace("metric_type".to_string());
log.insert("metric_type", "logs");

Expand Down
7 changes: 5 additions & 2 deletions src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ fn populate_event(

log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AmqpSourceConfig::NAME.as_bytes()),
);
Expand Down Expand Up @@ -713,7 +713,10 @@ mod integration_test {
trace!("{:?}", log);
assert_eq!(log[log_schema().message_key()], "my message".into());
assert_eq!(log["routing"], routing_key.into());
assert_eq!(log[log_schema().source_type_key()], "amqp".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"amqp".into()
);
let log_ts = log[log_schema().timestamp_key().unwrap().to_string()]
.as_timestamp()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_kinesis_firehose/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub(super) async fn firehose(
if let Event::Log(ref mut log) = event {
log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AwsKinesisFirehoseConfig::NAME.as_bytes()),
);
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ fn handle_single_log(

log_namespace.insert_vector_metadata(
log,
Some(log_schema().source_type_key()),
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(AwsS3Config::NAME.as_bytes()),
);
Expand Down
6 changes: 4 additions & 2 deletions src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ pub struct ApiKeyQueryParams {
pub(crate) struct DatadogAgentSource {
pub(crate) api_key_extractor: ApiKeyExtractor,
pub(crate) log_schema_host_key: &'static str,
pub(crate) log_schema_source_type_key: &'static str,
pub(crate) log_schema_source_type_key: String,
pub(crate) log_namespace: LogNamespace,
pub(crate) decoder: Decoder,
protocol: &'static str,
Expand Down Expand Up @@ -334,7 +334,9 @@ impl DatadogAgentSource {
.expect("static regex always compiles"),
},
log_schema_host_key: log_schema().host_key(),
log_schema_source_type_key: log_schema().source_type_key(),
log_schema_source_type_key: log_schema()
.source_type_key()
.map_or("".to_string(), |key| key.to_string()),
decoder,
protocol,
logs_schema_definition: Arc::new(logs_schema_definition),
Expand Down
40 changes: 32 additions & 8 deletions src/sources/datadog_agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ async fn full_payload_v1() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -300,7 +303,10 @@ async fn full_payload_v2() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -362,7 +368,10 @@ async fn no_api_key() {
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
event.metadata().schema_definition(),
&test_logs_schema_definition()
Expand Down Expand Up @@ -423,7 +432,10 @@ async fn api_key_in_url() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -488,7 +500,10 @@ async fn api_key_in_query_params() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -559,7 +574,10 @@ async fn api_key_in_header() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down Expand Up @@ -706,7 +724,10 @@ async fn ignores_api_key() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert!(event.metadata().datadog_api_key().is_none());
assert_eq!(
event.metadata().schema_definition(),
Expand Down Expand Up @@ -1398,7 +1419,10 @@ async fn split_outputs() {
assert_eq!(log["service"], "vector".into());
assert_eq!(log["ddsource"], "curl".into());
assert_eq!(log["ddtags"], "one,two,three".into());
assert_eq!(log[log_schema().source_type_key()], "datadog_agent".into());
assert_eq!(
log[log_schema().source_type_key().unwrap().to_string()],
"datadog_agent".into()
);
assert_eq!(
&event.metadata().datadog_api_key().as_ref().unwrap()[..],
"12345678abcdefgh12345678abcdefgh"
Expand Down
4 changes: 2 additions & 2 deletions src/sources/datadog_agent/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ fn handle_dd_trace_payload_v1(
.set_datadog_api_key(Arc::clone(k));
}
trace_event.insert(
source.log_schema_source_type_key,
source.log_schema_source_type_key.as_str(),
Bytes::from("datadog_agent"),
);
trace_event.insert("payload_version", "v2".to_string());
Expand Down Expand Up @@ -255,7 +255,7 @@ fn handle_dd_trace_payload_v0(
trace_event.insert("language_name", lang.clone());
}
trace_event.insert(
source.log_schema_source_type_key,
source.log_schema_source_type_key.as_str(),
Bytes::from("datadog_agent"),
);
trace_event.insert("payload_version", "v1".to_string());
Expand Down
Loading