Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,12 @@ libc = "0.2.147"
similar-asserts = "1.4.2"
proptest = "1.2"
quickcheck = "1.0.3"
lookup = { package = "vector-lookup", path = "lib/vector-lookup", features = ["test"] }
reqwest = { version = "0.11", features = ["json"] }
tempfile = "3.6.0"
test-generator = "0.3.1"
tokio-test = "0.4.2"
tokio = { version = "1.30.0", features = ["test-util"] }
tokio-test = "0.4.2"
tower-test = "0.4.0"
vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl", "test"] }
wiremock = "0.5.19"
Expand Down
6 changes: 3 additions & 3 deletions benches/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn benchmark_event_iterate(c: &mut Criterion) {
log.insert(event_path!("key3"), Bytes::from("value3"));
log
},
|e| e.all_fields().unwrap().count(),
|e| e.all_event_fields().unwrap().count(),
BatchSize::SmallInput,
)
});
Expand All @@ -35,7 +35,7 @@ fn benchmark_event_iterate(c: &mut Criterion) {
log.insert(event_path!("key3"), Bytes::from("value3"));
log
},
|e| e.all_fields().unwrap().count(),
|e| e.all_event_fields().unwrap().count(),
BatchSize::SmallInput,
)
});
Expand All @@ -48,7 +48,7 @@ fn benchmark_event_iterate(c: &mut Criterion) {
log.insert(event_path!("key1", "nested1", 1), Bytes::from("value2"));
log
},
|e| e.all_fields().unwrap().count(),
|e| e.all_event_fields().unwrap().count(),
BatchSize::SmallInput,
)
});
Expand Down
52 changes: 50 additions & 2 deletions lib/vector-core/src/event/log_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use super::{
};
use crate::config::LogNamespace;
use crate::config::{log_schema, telemetry};
use crate::event::util::log::{all_fields, all_metadata_fields};
use crate::{event::MaybeAsLogMut, ByteSizeOf};
use lookup::{metadata_path, path};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -420,8 +421,21 @@ impl LogEvent {
}
}

pub fn all_fields(&self) -> Option<impl Iterator<Item = (String, &Value)> + Serialize> {
self.as_map().map(util::log::all_fields)
/// If the event root value is a map, build and return an iterator to event field and value pairs.
/// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods.
pub fn all_event_fields(&self) -> Option<impl Iterator<Item = (String, &Value)> + Serialize> {
self.as_map().map(all_fields)
}

/// If the metadata root value is a map, build and return an iterator to metadata field and value pairs.
/// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods.
pub fn all_metadata_fields(
&self,
) -> Option<impl Iterator<Item = (String, &Value)> + Serialize> {
match self.metadata.value() {
Value::Object(metadata_map) => Some(metadata_map).map(all_metadata_fields),
_ => None,
}
}

/// Returns an iterator of all fields if the value is an Object. Otherwise,
Expand Down Expand Up @@ -1091,4 +1105,38 @@ mod test {

vector_common::assert_event_data_eq!(merged, expected);
}

#[test]
fn event_fields_iter() {
let mut log = LogEvent::default();
log.insert("a", 0);
log.insert("a.b", 1);
log.insert("c", 2);
let actual: Vec<(String, Value)> = log
.all_event_fields()
.unwrap()
.map(|(s, v)| (s, v.clone()))
.collect();
assert_eq!(
actual,
vec![("a.b".to_string(), 1.into()), ("c".to_string(), 2.into())]
);
}

#[test]
fn metadata_fields_iter() {
let mut log = LogEvent::default();
log.insert("%a", 0);
log.insert("%a.b", 1);
log.insert("%c", 2);
let actual: Vec<(String, Value)> = log
.all_metadata_fields()
.unwrap()
.map(|(s, v)| (s, v.clone()))
.collect();
assert_eq!(
actual,
vec![("%a.b".to_string(), 1.into()), ("%c".to_string(), 2.into())]
);
}
}
4 changes: 2 additions & 2 deletions lib/vector-core/src/event/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn event_iteration() {
log.insert("Pitbull", "The bigger they are, the harder they fall");

let all = log
.all_fields()
.all_event_fields()
.unwrap()
.map(|(k, v)| (k, v.to_string_lossy()))
.collect::<HashSet<_>>();
Expand All @@ -39,7 +39,7 @@ fn event_iteration_order() {
log.insert("o9amkaRY", Value::from("pGsfG7Nr"));
log.insert("YRjhxXcg", Value::from("nw8iM5Jr"));

let collected: Vec<_> = log.all_fields().unwrap().collect();
let collected: Vec<_> = log.all_event_fields().unwrap().collect();
assert_eq!(
collected,
vec![
Expand Down
4 changes: 2 additions & 2 deletions lib/vector-core/src/event/test/serialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn serialization() {
"timestamp": event.get(log_schema().timestamp_key().unwrap().to_string().as_str()),
});

let actual_all = serde_json::to_value(event.all_fields().unwrap()).unwrap();
let actual_all = serde_json::to_value(event.all_event_fields().unwrap()).unwrap();
assert_eq!(expected_all, actual_all);

let rfc3339_re = Regex::new(r"\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z\z").unwrap();
Expand All @@ -90,7 +90,7 @@ fn type_serialization() {
event.insert("bool", true);
event.insert("string", "thisisastring");

let map = serde_json::to_value(event.all_fields().unwrap()).unwrap();
let map = serde_json::to_value(event.all_event_fields().unwrap()).unwrap();
assert_eq!(map["float"], json!(5.5));
assert_eq!(map["int"], json!(4));
assert_eq!(map["bool"], json!(true));
Expand Down
51 changes: 50 additions & 1 deletion lib/vector-core/src/event/util/log/all_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
};

use serde::{Serialize, Serializer};
use vrl::path::PathPrefix;

use super::Value;

Expand All @@ -14,6 +15,12 @@ pub fn all_fields(fields: &BTreeMap<String, Value>) -> FieldsIter {
FieldsIter::new(fields)
}

/// Same functionality as `all_fields` but it prepends a character that denotes the
/// path type.
pub fn all_metadata_fields(fields: &BTreeMap<String, Value>) -> FieldsIter {
FieldsIter::new_with_prefix(PathPrefix::Metadata, fields)
}

/// An iterator with a single "message" element
pub fn all_fields_non_object_root(value: &Value) -> FieldsIter {
FieldsIter::non_object(value)
Expand All @@ -37,15 +44,30 @@ enum PathComponent<'a> {
/// If a key maps to an empty collection, the key and the empty collection will be returned.
#[derive(Clone)]
pub struct FieldsIter<'a> {
/// If specified, this will be prepended to each path.
path_prefix: Option<PathPrefix>,
/// Stack of iterators used for the depth-first traversal.
stack: Vec<LeafIter<'a>>,
/// Path components from the root up to the top of the stack.
path: Vec<PathComponent<'a>>,
}

impl<'a> FieldsIter<'a> {
// TODO deprecate this in favor of `new_with_prefix`.
fn new(fields: &'a BTreeMap<String, Value>) -> FieldsIter<'a> {
FieldsIter {
path_prefix: None,
stack: vec![LeafIter::Map(fields.iter())],
path: vec![],
}
}

fn new_with_prefix(
path_prefix: PathPrefix,
fields: &'a BTreeMap<String, Value>,
) -> FieldsIter<'a> {
FieldsIter {
path_prefix: Some(path_prefix),
stack: vec![LeafIter::Map(fields.iter())],
path: vec![],
}
Expand All @@ -55,6 +77,7 @@ impl<'a> FieldsIter<'a> {
/// will be treated as an object with a single "message" key
fn non_object(value: &'a Value) -> FieldsIter<'a> {
FieldsIter {
path_prefix: None,
stack: vec![LeafIter::Root((value, false))],
path: vec![],
}
Expand Down Expand Up @@ -82,7 +105,13 @@ impl<'a> FieldsIter<'a> {
}

fn make_path(&mut self, component: PathComponent<'a>) -> String {
let mut res = String::new();
let mut res = match self.path_prefix {
None => String::new(),
Some(prefix) => match prefix {
PathPrefix::Event => String::from("."),
PathPrefix::Metadata => String::from("%"),
},
};
let mut path_iter = self.path.iter().chain(iter::once(&component)).peekable();
loop {
match path_iter.next() {
Expand Down Expand Up @@ -177,6 +206,26 @@ mod test {
assert_eq!(collected, expected);
}

#[test]
fn metadata_keys_simple() {
let fields = fields_from_json(json!({
"field_1": 1,
"field_0": 0,
"field_2": 2
}));
let expected: Vec<_> = vec![
("%field_0", &Value::Integer(0)),
("%field_1", &Value::Integer(1)),
("%field_2", &Value::Integer(2)),
]
.into_iter()
.map(|(k, v)| (k.into(), v))
.collect();

let collected: Vec<_> = all_metadata_fields(&fields).collect();
assert_eq!(collected, expected);
}

#[test]
fn keys_nested() {
let fields = fields_from_json(json!({
Expand Down
2 changes: 1 addition & 1 deletion lib/vector-core/src/event/util/log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod all_fields;
mod keys;

pub use all_fields::{all_fields, all_fields_non_object_root};
pub use all_fields::{all_fields, all_fields_non_object_root, all_metadata_fields};
pub use keys::keys;

pub(self) use super::Value;
Expand Down
3 changes: 3 additions & 0 deletions lib/vector-lookup/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ serde = { version = "1.0.183", default-features = false, features = ["derive", "
vector-config = { path = "../vector-config" }
vector-config-macros = { path = "../vector-config-macros" }
vrl.workspace = true

[features]
test = []
7 changes: 7 additions & 0 deletions lib/vector-lookup/src/lookup_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,10 @@ impl<'a> TargetPath<'a> for &'a ConfigTargetPath {
&self.0.path
}
}

#[cfg(any(test, feature = "test"))]
impl From<&str> for ConfigTargetPath {
fn from(path: &str) -> Self {
ConfigTargetPath::try_from(path.to_string()).unwrap()
}
}
2 changes: 1 addition & 1 deletion src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl Transformer {
TimestampFormat::Unix => {
if log.value().is_object() {
let mut unix_timestamps = Vec::new();
for (k, v) in log.all_fields().expect("must be an object") {
for (k, v) in log.all_event_fields().expect("must be an object") {
if let Value::Timestamp(ts) = v {
unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp())));
}
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/azure_blob/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn azure_blob_insert_json_into_blob() {
);
let expected = events
.iter()
.map(|event| serde_json::to_string(&event.as_log().all_fields().unwrap()).unwrap())
.map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap())
.collect::<Vec<_>>();
assert_eq!(expected, blob_lines);
}
Expand Down Expand Up @@ -179,7 +179,7 @@ async fn azure_blob_insert_json_into_blob_gzip() {
);
let expected = events
.iter()
.map(|event| serde_json::to_string(&event.as_log().all_fields().unwrap()).unwrap())
.map(|event| serde_json::to_string(&event.as_log().all_event_fields().unwrap()).unwrap())
.collect::<Vec<_>>();
assert_eq!(expected, blob_lines);
}
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/gcp/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ mod integration_tests {
for i in 0..input.len() {
let data = messages[i].message.decode_data();
let data = serde_json::to_value(data).unwrap();
let expected = serde_json::to_value(input[i].as_log().all_fields().unwrap()).unwrap();
let expected =
serde_json::to_value(input[i].as_log().all_event_fields().unwrap()).unwrap();
assert_eq!(data, expected);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/sources/dnstap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ mod integration_tests {
}

for event in events {
let json = serde_json::to_value(event.as_log().all_fields().unwrap()).unwrap();
let json = serde_json::to_value(event.as_log().all_event_fields().unwrap()).unwrap();
match query_event {
"query" => {
if json["messageType"] == json!("ClientQuery") {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ mod tests {
assert!(log.get(PID_KEY).is_some());
assert!(log.get_timestamp().is_some());

assert_eq!(8, log.all_fields().unwrap().count());
assert_eq!(8, log.all_event_fields().unwrap().count());
} else {
panic!("Expected to receive a linux event");
}
Expand Down
Loading