Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions lib/vector-core/src/config/log_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct LogSchema {
/// This field will generally represent a real host, or container, that generated the message,
/// but is somewhat source-dependent.
#[serde(default = "LogSchema::default_host_key")]
host_key: String,
host_key: OptionalValuePath,

/// The name of the event field to set the source identifier in.
///
Expand Down Expand Up @@ -92,8 +92,8 @@ impl LogSchema {
OptionalValuePath::new("timestamp")
}

fn default_host_key() -> String {
String::from("host")
fn default_host_key() -> OptionalValuePath {
OptionalValuePath::new("host")
}

fn default_source_type_key() -> OptionalValuePath {
Expand Down Expand Up @@ -121,8 +121,8 @@ impl LogSchema {
self.timestamp_key.path.as_ref()
}

pub fn host_key(&self) -> &str {
&self.host_key
pub fn host_key(&self) -> Option<&OwnedValuePath> {
self.host_key.path.as_ref()
}

pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
Expand All @@ -141,8 +141,8 @@ impl LogSchema {
self.timestamp_key = OptionalValuePath { path: v };
}

pub fn set_host_key(&mut self, v: String) {
self.host_key = v;
pub fn set_host_key(&mut self, path: Option<OwnedValuePath>) {
self.host_key = OptionalValuePath { path };
}

pub fn set_source_type_key(&mut self, path: Option<OwnedValuePath>) {
Expand All @@ -169,7 +169,7 @@ impl LogSchema {
{
errors.push("conflicting values for 'log_schema.host_key' found".to_owned());
} else {
self.set_host_key(other.host_key().to_string());
self.set_host_key(other.host_key().cloned());
}
if self.message_key() != LOG_SCHEMA_DEFAULT.message_key()
&& self.message_key() != other.message_key()
Expand Down
6 changes: 4 additions & 2 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ impl LogEvent {
pub fn host_path(&self) -> Option<String> {
match self.namespace() {
LogNamespace::Vector => self.find_key_by_meaning("host"),
LogNamespace::Legacy => Some(log_schema().host_key().to_owned()),
LogNamespace::Legacy => log_schema().host_key().map(ToString::to_string),
}
}

Expand Down Expand Up @@ -505,7 +505,9 @@ impl LogEvent {
pub fn get_host(&self) -> Option<&Value> {
match self.namespace() {
LogNamespace::Vector => self.get_by_meaning("host"),
LogNamespace::Legacy => self.get((PathPrefix::Event, log_schema().host_key())),
LogNamespace::Legacy => log_schema()
.host_key()
.and_then(|key| self.get((PathPrefix::Event, key))),
}
}

Expand Down
8 changes: 8 additions & 0 deletions lib/vector-lookup/src/lookup_v2/optional_path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,11 @@ impl From<OwnedValuePath> for OptionalValuePath {
Self { path: Some(path) }
}
}

impl From<Option<OwnedValuePath>> for OptionalValuePath {
fn from(value: Option<OwnedValuePath>) -> Self {
value.map_or(OptionalValuePath::none(), |path| {
OptionalValuePath::from(path)
})
}
}
10 changes: 8 additions & 2 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,10 @@ mod tests {
)
.unwrap();

assert_eq!("host", config.global.log_schema.host_key().to_string());
assert_eq!(
"host",
config.global.log_schema.host_key().unwrap().to_string()
);
assert_eq!(
"message",
config.global.log_schema.message_key().to_string()
Expand Down Expand Up @@ -879,7 +882,10 @@ mod tests {
)
.unwrap();

assert_eq!("this", config.global.log_schema.host_key().to_string());
assert_eq!(
"this",
config.global.log_schema.host_key().unwrap().to_string()
);
assert_eq!("that", config.global.log_schema.message_key().to_string());
assert_eq!(
"then",
Expand Down
10 changes: 8 additions & 2 deletions src/sinks/datadog/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,10 @@ fn sketch_to_proto_message(
let name = get_namespaced_name(metric, default_namespace);
let ts = encode_timestamp(metric.timestamp());
let mut tags = metric.tags().cloned().unwrap_or_default();
let host = tags.remove(log_schema.host_key()).unwrap_or_default();
let host = log_schema
.host_key()
.map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
.unwrap_or_default();
let tags = encode_tags(&tags);

let cnt = ddsketch.count() as i64;
Expand Down Expand Up @@ -497,7 +500,10 @@ fn generate_series_metrics(
let name = get_namespaced_name(metric, default_namespace);

let mut tags = metric.tags().cloned().unwrap_or_default();
let host = tags.remove(log_schema.host_key());
let host = log_schema
.host_key()
.map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default());

let source_type_name = tags.remove("source_type_name");
let device = tags.remove("device");
let ts = encode_timestamp(metric.timestamp());
Expand Down
13 changes: 9 additions & 4 deletions src/sinks/datadog/traces/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use futures_util::{
};
use tokio::sync::oneshot::{channel, Sender};
use tower::Service;
use vrl::path::PathPrefix;

use vector_core::{
config::log_schema,
event::Event,
Expand All @@ -15,11 +17,13 @@ use vector_core::{
stream::{BatcherSettings, DriverResponse},
};

use super::service::TraceApiRequest;
use crate::{
internal_events::DatadogTracesEncodingError,
sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt},
};

use super::service::TraceApiRequest;

#[derive(Default)]
struct EventPartitioner;

Expand Down Expand Up @@ -51,9 +55,10 @@ impl Partitioner for EventPartitioner {
Event::Trace(t) => PartitionKey {
api_key: item.metadata().datadog_api_key(),
env: t.get("env").map(|s| s.to_string_lossy().into_owned()),
hostname: t
.get(log_schema().host_key())
.map(|s| s.to_string_lossy().into_owned()),
hostname: log_schema().host_key().and_then(|key| {
t.get((PathPrefix::Event, key))
.map(|s| s.to_string_lossy().into_owned())
}),
agent_version: t
.get("agent_version")
.map(|s| s.to_string_lossy().into_owned()),
Expand Down
23 changes: 13 additions & 10 deletions src/sinks/humio/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use lookup::lookup_v2::OptionalValuePath;
use vector_common::sensitive_string::SensitiveString;
use vector_config::configurable_component;

use super::host_key;
use super::config_host_key;
use crate::sinks::splunk_hec::common::config_timestamp_key;
use crate::{
codecs::EncodingConfig,
Expand Down Expand Up @@ -74,8 +74,8 @@ pub struct HumioLogsConfig {
/// By default, the [global `log_schema.host_key` option][global_host_key] is used.
///
/// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
#[serde(default = "host_key")]
pub(super) host_key: String,
#[serde(default = "config_host_key")]
pub(super) host_key: OptionalValuePath,

/// Event fields to be added to Humio’s extra fields.
///
Expand Down Expand Up @@ -154,7 +154,7 @@ impl GenerateConfig for HumioLogsConfig {
event_type: None,
indexed_fields: vec![],
index: None,
host_key: host_key(),
host_key: config_host_key(),
compression: Compression::default(),
request: TowerRequestConfig::default(),
batch: BatchConfig::default(),
Expand Down Expand Up @@ -231,6 +231,7 @@ mod integration_tests {
use serde::Deserialize;
use serde_json::{json, Value as JsonValue};
use tokio::time::Duration;
use vrl::path::PathPrefix;

use super::*;
use crate::{
Expand Down Expand Up @@ -262,14 +263,14 @@ mod integration_tests {
let message = random_string(100);
let host = "192.168.1.1".to_string();
let mut event = LogEvent::from(message.clone());
event.insert(log_schema().host_key(), host.clone());
event.insert(
(PathPrefix::Event, log_schema().host_key().unwrap()),
host.clone(),
);

let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456);
event.insert(
(
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
),
(PathPrefix::Event, log_schema().timestamp_key().unwrap()),
ts,
);

Expand Down Expand Up @@ -387,7 +388,9 @@ mod integration_tests {
source: None,
encoding: JsonSerializerConfig::default().into(),
event_type: None,
host_key: log_schema().host_key().to_string(),
host_key: OptionalValuePath {
path: log_schema().host_key().cloned(),
},
indexed_fields: vec![],
index: None,
compression: Compression::None,
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/humio/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use vector_config::configurable_component;
use vector_core::sink::StreamSink;

use super::{
host_key,
config_host_key,
logs::{HumioLogsConfig, HOST},
};
use crate::{
Expand Down Expand Up @@ -86,8 +86,8 @@ pub struct HumioMetricsConfig {
/// By default, the [global `log_schema.host_key` option][global_host_key] is used.
///
/// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
#[serde(default = "host_key")]
host_key: String,
#[serde(default = "config_host_key")]
host_key: OptionalValuePath,

/// Event fields to be added to Humio’s extra fields.
///
Expand Down
8 changes: 6 additions & 2 deletions src/sinks/humio/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use lookup::lookup_v2::OptionalValuePath;

pub mod logs;
pub mod metrics;

fn host_key() -> String {
crate::config::log_schema().host_key().to_string()
pub fn config_host_key() -> OptionalValuePath {
OptionalValuePath {
path: crate::config::log_schema().host_key().cloned(),
}
}
24 changes: 14 additions & 10 deletions src/sinks/influxdb/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ use bytes::{Bytes, BytesMut};
use futures::SinkExt;
use http::{Request, Uri};
use indoc::indoc;
use vrl::value::Kind;

use lookup::lookup_v2::{parse_value_path, OptionalValuePath};
use lookup::{OwnedValuePath, PathPrefix};
use vector_config::configurable_component;
use vector_core::config::log_schema;
use vector_core::schema;
use vrl::value::Kind;

use crate::{
codecs::Transformer,
Expand Down Expand Up @@ -189,10 +190,8 @@ impl SinkConfig for InfluxDbLogsConfig {
.host_key
.clone()
.and_then(|k| k.path)
.unwrap_or_else(|| {
parse_value_path(log_schema().host_key())
.expect("global log_schema.host_key to be valid path")
});
.or(log_schema().host_key().cloned())
.expect("global log_schema.host_key to be valid path");

let message_key = self
.message_key
Expand Down Expand Up @@ -409,10 +408,10 @@ mod tests {
use futures::{channel::mpsc, stream, StreamExt};
use http::{request::Parts, StatusCode};
use indoc::indoc;

use lookup::owned_value_path;
use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent};

use super::*;
use crate::{
sinks::{
influxdb::test_util::{assert_fields, split_line_protocol, ts},
Expand All @@ -427,6 +426,8 @@ mod tests {
},
};

use super::*;

type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>;

#[test]
Expand Down Expand Up @@ -880,16 +881,17 @@ mod tests {
#[cfg(feature = "influxdb-integration-tests")]
#[cfg(test)]
mod integration_tests {
use std::sync::Arc;

use chrono::Utc;
use codecs::BytesDeserializerConfig;
use futures::stream;
use vrl::value;

use codecs::BytesDeserializerConfig;
use lookup::{owned_value_path, path};
use std::sync::Arc;
use vector_core::config::{LegacyKey, LogNamespace};
use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent};
use vrl::value;

use super::*;
use crate::{
config::SinkContext,
sinks::influxdb::{
Expand All @@ -900,6 +902,8 @@ mod integration_tests {
test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
};

use super::*;

#[tokio::test]
async fn influxdb2_logs_put_data() {
let endpoint = address_v2();
Expand Down
6 changes: 4 additions & 2 deletions src/sinks/splunk_hec/common/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,10 @@ pub fn build_uri(
uri.parse::<Uri>()
}

pub fn host_key() -> String {
crate::config::log_schema().host_key().to_string()
pub fn config_host_key() -> OptionalValuePath {
OptionalValuePath {
path: crate::config::log_schema().host_key().cloned(),
}
}

pub fn config_timestamp_key() -> OptionalValuePath {
Expand Down
Loading