diff --git a/Cargo.lock b/Cargo.lock index 1f5bfb12895ae..b5e9cdd6a9be4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -172,9 +172,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.5.0" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" +checksum = "2ab91ebe16eb252986481c5b62f6098f3b698a45e34b5b98200cf20dd2484a44" dependencies = [ "anstyle 1.0.0", "anstyle-parse", @@ -216,9 +216,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "2.1.0" +version = "3.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" +checksum = "f0699d10d2f4d628a98ee7b57b289abbc98ff3bad977cb3152709d4bf2330628" dependencies = [ "anstyle 1.0.0", "windows-sys 0.48.0", @@ -500,7 +500,7 @@ dependencies = [ "fnv", "futures-util", "http", - "indexmap 2.0.1", + "indexmap 2.0.2", "mime", "multer", "num-traits", @@ -590,7 +590,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86046bbced96c0fab3ff5d2b3c769c0c55b0b3a7d67f9e2f2044f349f2e7d501" dependencies = [ "bytes 1.5.0", - "indexmap 2.0.1", + "indexmap 2.0.2", "serde", "serde_json", ] @@ -2061,9 +2061,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.5" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "824956d0dca8334758a5b7f7e50518d66ea319330cbceedcf76905c2f6ab30e3" +checksum = "d04704f56c2cde07f43e8e2c154b43f216dc5c92fc98ada720177362f953b956" dependencies = [ "clap_builder", "clap_derive", @@ -2075,15 +2075,15 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1eef05769009513df2eb1c3b4613e7fad873a14c600ff025b08f250f59fee7de" dependencies = [ - "clap 4.4.5", + "clap 4.4.6", "log", ] [[package]] name = "clap_builder" -version = "4.4.5" +version = "4.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "122ec64120a49b4563ccaedcbea7818d069ed8e9aa6d829b82d8a4128936b2ab" +checksum = "0e231faeaca65ebd1ea3c737966bf858971cd38c3849107aa3ea7de90a804e45" dependencies = [ "anstream", "anstyle 1.0.0", @@ -2098,7 +2098,7 @@ version = "4.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3ae8ba90b9d8b007efe66e55e48fb936272f5ca00349b5b0e89877520d35ea7" dependencies = [ - "clap 4.4.5", + "clap 4.4.6", ] [[package]] @@ -2231,9 +2231,9 @@ dependencies = [ [[package]] name = "community-id" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a97e158411823bf87634e2e47552f712e51fa4119cdb2255da799e7cb5c90a9" +checksum = "4f6af96839c04974cf381e427792a99913ecf3f7bfb348f153dc8a8e5f9803ad" dependencies = [ "anyhow", "base64 0.21.4", @@ -2296,7 +2296,7 @@ dependencies = [ "futures-core", "prost 0.12.1", "prost-types 0.12.1", - "tonic 0.10.1", + "tonic 0.10.2", "tracing-core 0.1.30", ] @@ -2318,7 +2318,7 @@ dependencies = [ "thread_local", "tokio", "tokio-stream", - "tonic 0.10.1", + "tonic 0.10.2", "tracing 0.1.37", "tracing-core 0.1.30", "tracing-subscriber", @@ -2427,7 +2427,7 @@ dependencies = [ "anes", "cast", "ciborium", - "clap 4.4.5", + "clap 4.4.6", "criterion-plot", "futures 0.3.28", "is-terminal", @@ -2569,9 +2569,9 @@ dependencies = [ [[package]] name = "csv" -version = "1.2.2" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "626ae34994d3d8d668f4269922248239db4ae42d538b14c398b74a52208e8086" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" dependencies = [ "csv-core", "itoa", @@ -2581,9 +2581,9 @@ dependencies = [ [[package]] name = "csv-core" -version = "0.1.10" +version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" dependencies = [ "memchr", ] @@ -3373,7 +3373,7 @@ dependencies = [ "flate2", "futures 0.3.28", "glob", - "indexmap 2.0.1", + "indexmap 2.0.2", "libc", "quickcheck", "scan_fmt", @@ -4390,9 +4390,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.1" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad227c3af19d4914570ad36d30409928b75967c298feb9ea1969db3a610bb14e" +checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", "hashbrown 0.14.1", @@ -6145,9 +6145,9 @@ checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "pest" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7a4d085fd991ac8d5b05a147b437791b4260b76326baf0fc60cf7c9c27ecd33" +checksum = "c022f1e7b65d6a24c0dbbd5fb344c66881bc01f3e5ae74a1c8100f2f985d98a4" dependencies = [ "memchr", "thiserror", @@ -6156,9 +6156,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bee7be22ce7918f641a33f08e3f43388c7656772244e2bbb2477f44cc9021a" +checksum = "35513f630d46400a977c4cb58f78e1bfbe01434316e60c37d27b9ad6139c66d8" dependencies = [ "pest", "pest_generator", @@ -6166,9 +6166,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1511785c5e98d79a05e8a6bc34b4ac2168a0e3e92161862030ad84daa223141" +checksum = "bc9fc1b9e7057baba189b5c626e2d6f40681ae5b6eb064dc7c7834101ec8123a" dependencies = [ "pest", "pest_meta", @@ -6179,9 +6179,9 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.3" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b42f0394d3123e33353ca5e1e89092e533d2cc490389f2bd6131c43c634ebc5f" +checksum = "1df74e9e7ec4053ceb980e7c0c8bd3594e977fde1af91daba9c928e8e8c6708d" dependencies = [ "once_cell", "pest", @@ -6602,7 +6602,7 @@ dependencies = [ name = "prometheus-parser" version = "0.1.0" dependencies = [ - "indexmap 2.0.1", + "indexmap 2.0.2", "nom", "num_enum 0.7.0", "prost 0.12.1", @@ -7402,9 +7402,9 @@ dependencies = [ [[package]] name = "roxmltree" -version = "0.18.0" +version = "0.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8f595a457b6b8c6cda66a48503e92ee8d19342f905948f29c383200ec9eb1d8" +checksum = "862340e351ce1b271a378ec53f304a5558f7db87f3769dc655a8f6ecbb68b302" dependencies = [ "xmlparser", ] @@ -7838,7 +7838,7 @@ version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ - "indexmap 2.0.1", + "indexmap 2.0.2", "itoa", "ryu", "serde", @@ -7925,7 +7925,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.0.1", + "indexmap 2.0.2", "serde", "serde_json", "serde_with_macros 3.3.0", @@ -7974,7 +7974,7 @@ version = "0.9.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a49e178e4452f45cb61d0cd8cebc1b0fafd3e41929e996cef79aa3aca91f574" dependencies = [ - "indexmap 2.0.1", + "indexmap 2.0.2", "itoa", "ryu", "serde", @@ -8903,7 +8903,7 @@ version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca676d9ba1a322c1b64eb8045a5ec5c0cfb0c9d08e15e9ff622589ad5221c8fe" dependencies = [ - "indexmap 2.0.1", + "indexmap 2.0.2", "serde", "serde_spanned", "toml_datetime", @@ -9582,7 +9582,7 @@ dependencies = [ "atty", "cached", "chrono", - "clap 4.4.5", + "clap 4.4.6", "clap-verbosity-flag", "clap_complete", "confy", @@ -9590,7 +9590,7 @@ dependencies = [ "dunce", "glob", "hex", - "indexmap 2.0.1", + "indexmap 2.0.2", "indicatif", "itertools 0.11.0", "log", @@ -9659,7 +9659,7 @@ dependencies = [ "bytesize", "chrono", "cidr-utils", - "clap 4.4.5", + "clap 4.4.6", "codecs", "colored", "console-subscriber", @@ -9697,7 +9697,7 @@ dependencies = [ "hyper", "hyper-openssl", "hyper-proxy", - "indexmap 2.0.1", + "indexmap 2.0.2", "indoc", "infer 0.15.0", "inventory", @@ -9820,7 +9820,7 @@ dependencies = [ "anyhow", "async-trait", "chrono", - "clap 4.4.5", + "clap 4.4.6", "futures 0.3.28", "graphql_client", "indoc", @@ -9843,7 +9843,7 @@ dependencies = [ "async-trait", "bytecheck", "bytes 1.5.0", - "clap 4.4.5", + "clap 4.4.6", "crc32fast", "criterion", "crossbeam-queue", @@ -9889,7 +9889,7 @@ dependencies = [ "crossbeam-utils", "derivative", "futures 0.3.28", - "indexmap 2.0.1", + "indexmap 2.0.2", "metrics", "nom", "ordered-float 4.1.0", @@ -9920,7 +9920,7 @@ dependencies = [ "chrono-tz", "encoding_rs", "http", - "indexmap 2.0.1", + "indexmap 2.0.2", "inventory", "no-proxy", "num-traits", @@ -9991,7 +9991,7 @@ dependencies = [ "headers", "http", "hyper-proxy", - "indexmap 2.0.1", + "indexmap 2.0.2", "metrics", "metrics-tracing-context", "metrics-util", @@ -10063,7 +10063,7 @@ dependencies = [ name = "vector-vrl-cli" version = "0.1.0" dependencies = [ - "clap 4.4.5", + "clap 4.4.6", "vector-vrl-functions", "vrl", ] @@ -10082,7 +10082,7 @@ dependencies = [ "ansi_term", "chrono", "chrono-tz", - "clap 4.4.5", + "clap 4.4.6", "enrichment", "glob", "prettydiff", @@ -10125,8 +10125,7 @@ checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" [[package]] name = "vrl" version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10d3c3ffe5322a8a7f63cf422aebb5000774e4152955fca1cd024c262893db5d" +source = "git+https://github.com/vectordotdev/vrl?branch=bruceg/keystring-experiment#4302535a5408d52d8f39bb5113a9dfa68f200c8f" dependencies = [ "aes", "ansi_term", @@ -10143,7 +10142,7 @@ dependencies = [ "chrono", "chrono-tz", "cidr-utils", - "clap 4.4.5", + "clap 4.4.6", "codespan-reporting", "community-id", "crypto_secretbox", @@ -10158,7 +10157,7 @@ dependencies = [ "hex", "hmac", "hostname", - "indexmap 2.0.1", + "indexmap 2.0.2", "indoc", "itertools 0.11.0", "lalrpop", @@ -10181,7 +10180,7 @@ dependencies = [ "quoted_printable", "rand 0.8.5", "regex", - "roxmltree 0.18.0", + "roxmltree 0.18.1", "rust_decimal", "rustyline", "seahash", @@ -10734,9 +10733,9 @@ dependencies = [ [[package]] name = "xmlparser" -version = "0.13.5" +version = "0.13.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d25c75bf9ea12c4040a97f829154768bbbce366287e2dc044af160cd79a13fd" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" [[package]] name = "yaml-rust" diff --git a/Cargo.toml b/Cargo.toml index feae5782770f8..acaf690c2cae9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,7 +118,7 @@ members = [ ] [workspace.dependencies] -vrl = { version = "0.7.0", default-features = false, features = ["cli", "test", "test_framework", "arbitrary", "compiler", "value", "diagnostic", "path", "parser", "stdlib", "datadog", "core"] } +vrl = { git = "https://github.com/vectordotdev/vrl", branch = "bruceg/keystring-experiment", default-features = false, features = ["cli", "test", "test_framework", "arbitrary", "compiler", "value", "diagnostic", "path", "parser", "stdlib", "datadog", "core"] } pin-project = { version = "1.1.3", default-features = false } @@ -255,9 +255,9 @@ bytes = { version = "1.5.0", default-features = false, features = ["serde"] } bytesize = { version = "1.3.0", default-features = false } chrono = { version = "0.4.31", default-features = false, features = ["serde"] } cidr-utils = { version = "0.5.11", default-features = false } -clap = { version = "4.4.3", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] } +clap = { version = "4.4.6", default-features = false, features = ["derive", "error-context", "env", "help", "std", "string", "usage", "wrap_help"] } colored = { version = "2.0.4", default-features = false } -csv = { version = "1.2", default-features = false } +csv = { version = "1.3", default-features = false } derivative = { version = "2.2.0", default-features = false } dirs-next = { version = "2.0.0", default-features = false, optional = true } dyn-clone = { version = "1.0.14", default-features = false } @@ -374,7 +374,7 @@ tokio = { version = "1.32.0", features = ["test-util"] } tokio-test = "0.4.3" tower-test = "0.4.0" vector-core = { path = "lib/vector-core", default-features = false, features = ["vrl", "test"] } -vrl = { version = "0.7.0", features = ["cli", "test", "test_framework", "arbitrary"] } +vrl = { git = "https://github.com/vectordotdev/vrl", branch = "bruceg/keystring-experiment", default-features = false, features = ["cli", "test", "test_framework", "arbitrary"] } wiremock = "0.5.19" zstd = { version = "0.12.4", default-features = false } diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 24e2ff4280bab..5a7a283f8f04f 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -38,8 +38,8 @@ futures = { version = "0.3", default-features = false } indoc = { version = "2", default-features = false } tokio = { version = "1", features = ["test-util"] } similar-asserts = "1.5.0" -vrl = { version = "0.7.0", features = ["cli", "test", "test_framework", "arbitrary"] } vector-core = { path = "../vector-core", default-features = false, features = ["test"] } +vrl.workspace = true [features] syslog = ["dep:syslog_loose"] diff --git a/lib/codecs/src/decoding/format/protobuf.rs b/lib/codecs/src/decoding/format/protobuf.rs index 65be4990b0905..09b5459105c29 100644 --- a/lib/codecs/src/decoding/format/protobuf.rs +++ b/lib/codecs/src/decoding/format/protobuf.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeMap; use std::path::PathBuf; use bytes::Bytes; @@ -14,7 +13,7 @@ use vector_core::{ event::Event, schema, }; -use vrl::value::Kind; +use vrl::value::{Kind, ObjectMap}; use crate::common::protobuf::get_message_descriptor; @@ -171,11 +170,11 @@ fn to_vrl( } } prost_reflect::Value::Message(v) => { - let mut obj_map = BTreeMap::new(); + let mut obj_map = ObjectMap::new(); for field_desc in v.descriptor().fields() { let field_value = v.get_field(&field_desc); let out = to_vrl(field_value.as_ref(), Some(&field_desc))?; - obj_map.insert(field_desc.name().to_string(), out); + obj_map.insert(field_desc.name().into(), out); } vrl::value::Value::from(obj_map) } @@ -206,11 +205,11 @@ fn to_vrl( field_descriptor ) })? - .to_string(), + .into(), to_vrl(kv.1, Some(&message_desc.map_entry_value_field()))?, )) }) - .collect::>>()?, + .collect::>()?, ) } else { Err("Expected valid field descriptor")? diff --git a/lib/codecs/src/decoding/format/syslog.rs b/lib/codecs/src/decoding/format/syslog.rs index 041d6a4c93cc6..33ab5d1d05f36 100644 --- a/lib/codecs/src/decoding/format/syslog.rs +++ b/lib/codecs/src/decoding/format/syslog.rs @@ -4,13 +4,12 @@ use derivative::Derivative; use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath}; use smallvec::{smallvec, SmallVec}; use std::borrow::Cow; -use std::collections::BTreeMap; use syslog_loose::{IncompleteDate, Message, ProcId, Protocol, Variant}; use vector_config::configurable_component; use vector_core::config::{LegacyKey, LogNamespace}; use vector_core::{ config::{log_schema, DataType}, - event::{Event, LogEvent, Value}, + event::{Event, LogEvent, ObjectMap, Value}, schema, }; use vrl::value::{kind::Collection, Kind}; @@ -292,7 +291,7 @@ impl Deserializer for SyslogDeserializer { log } _ => { - let mut log = LogEvent::from(Value::Object(BTreeMap::new())); + let mut log = LogEvent::from(Value::Object(ObjectMap::new())); insert_fields_from_syslog(&mut log, parsed, log_namespace); log } @@ -402,15 +401,15 @@ fn insert_metadata_fields_from_syslog( ); } - let mut sdata: BTreeMap = BTreeMap::new(); + let mut sdata = ObjectMap::new(); for element in parsed.structured_data.into_iter() { - let mut data: BTreeMap = BTreeMap::new(); + let mut data = ObjectMap::new(); for (name, value) in element.params() { - data.insert(name.to_string(), value.into()); + data.insert(name.to_string().into(), value.into()); } - sdata.insert(element.id.to_string(), data.into()); + sdata.insert(element.id.into(), data.into()); } log_namespace.insert_source_metadata( @@ -474,9 +473,9 @@ fn insert_fields_from_syslog( } for element in parsed.structured_data.into_iter() { - let mut sdata: BTreeMap = BTreeMap::new(); + let mut sdata = ObjectMap::new(); for (name, value) in element.params() { - sdata.insert(name.to_string(), value.into()); + sdata.insert(name.to_string().into(), value.into()); } log.insert(event_path!(element.id), sdata); } diff --git a/lib/codecs/src/encoding/format/csv.rs b/lib/codecs/src/encoding/format/csv.rs index cc485139fdee7..f28a21b3faea2 100644 --- a/lib/codecs/src/encoding/format/csv.rs +++ b/lib/codecs/src/encoding/format/csv.rs @@ -309,20 +309,20 @@ mod tests { use chrono::DateTime; use ordered_float::NotNan; use vector_common::btreemap; - use vector_core::event::{LogEvent, Value}; + use vector_core::event::{LogEvent, ObjectMap, Value}; use super::*; fn make_event_with_fields(field_data: Vec<(&str, &str)>) -> (Vec, Event) { let mut fields: Vec = std::vec::Vec::new(); - let mut tree = std::collections::BTreeMap::new(); + let mut tree = ObjectMap::new(); - for (field_name, field_value) in field_data.iter() { - let field = ConfigTargetPath::try_from(field_name.to_string()).unwrap(); + for (field_name, field_value) in field_data.into_iter() { + let field = ConfigTargetPath::try_from(field_name.clone()).unwrap(); fields.push(field); let field_value = Value::from(field_value.to_string()); - tree.insert(field_name.to_string().clone(), field_value); + tree.insert(field_name.into(), field_value); } let event = Event::Log(LogEvent::from(tree)); diff --git a/lib/codecs/src/encoding/format/gelf.rs b/lib/codecs/src/encoding/format/gelf.rs index d67ab918060f2..ea720f43bdb4a 100644 --- a/lib/codecs/src/encoding/format/gelf.rs +++ b/lib/codecs/src/encoding/format/gelf.rs @@ -8,9 +8,7 @@ use snafu::Snafu; use tokio_util::codec::Encoder; use vector_core::{ config::{log_schema, DataType}, - event::Event, - event::LogEvent, - event::Value, + event::{Event, KeyString, LogEvent, Value}, schema, }; @@ -26,7 +24,7 @@ use vector_core::{ #[derive(Debug, Snafu)] pub enum GelfSerializerError { #[snafu(display(r#"LogEvent does not contain required field: "{}""#, field))] - MissingField { field: String }, + MissingField { field: KeyString }, #[snafu(display( r#"LogEvent contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#, field, @@ -158,7 +156,7 @@ fn coerce_field_names_and_values( let mut missing_prefix = vec![]; if let Some(event_data) = log.as_map_mut() { for (field, value) in event_data.iter_mut() { - match field.as_str() { + match &field[..] { VERSION | HOST | SHORT_MESSAGE | FULL_MESSAGE | FACILITY | FILE => { if !value.is_bytes() { err_invalid_type(field, "UTF-8 string", value.kind_str())?; @@ -194,9 +192,11 @@ fn coerce_field_names_and_values( _ => { // additional fields must be only word chars, dashes and periods. if !VALID_FIELD_REGEX.is_match(field) { - return MissingFieldSnafu { field } - .fail() - .map_err(|e| e.to_string().into()); + return MissingFieldSnafu { + field: field.clone(), + } + .fail() + .map_err(|e| e.to_string().into()); } // additional field values must be only strings or numbers @@ -238,20 +238,15 @@ fn to_gelf_event(log: LogEvent) -> vector_common::Result { #[cfg(test)] mod tests { - use std::collections::BTreeMap; - use crate::encoding::SerializerConfig; use super::*; use chrono::NaiveDateTime; use vector_core::event::{Event, EventMetadata}; use vrl::btreemap; - use vrl::value::Value; + use vrl::value::{ObjectMap, Value}; - fn do_serialize( - expect_success: bool, - event_fields: BTreeMap, - ) -> Option { + fn do_serialize(expect_success: bool, event_fields: ObjectMap) -> Option { let config = GelfSerializerConfig::new(); let mut serializer = config.build(); let event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into(); diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index a67a631edf3eb..e900be821bdd9 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -116,7 +116,7 @@ fn convert_value_raw( for (key, val) in o.into_iter() { match convert_value(&value_field, val) { Ok(prost_val) => { - map.insert(MapKey::String(key), prost_val); + map.insert(MapKey::String(key.into()), prost_val); } Err(e) => return Err(e), } diff --git a/lib/enrichment/src/find_enrichment_table_records.rs b/lib/enrichment/src/find_enrichment_table_records.rs index 7086709cd208a..f2ec4313b60d5 100644 --- a/lib/enrichment/src/find_enrichment_table_records.rs +++ b/lib/enrichment/src/find_enrichment_table_records.rs @@ -132,7 +132,7 @@ impl Function for FindEnrichmentTableRecords { #[derive(Debug, Clone)] pub struct FindEnrichmentTableRecordsFn { table: String, - condition: BTreeMap, + condition: BTreeMap, index: Option, select: Option>, case_sensitive: Case, @@ -203,7 +203,7 @@ mod tests { }; let tz = TimeZone::default(); - let object: Value = BTreeMap::new().into(); + let object: Value = ObjectMap::new().into(); let mut target = TargetValue { value: object, metadata: value!({}), diff --git a/lib/enrichment/src/get_enrichment_table_record.rs b/lib/enrichment/src/get_enrichment_table_record.rs index a028894d89373..ddd99c99c804b 100644 --- a/lib/enrichment/src/get_enrichment_table_record.rs +++ b/lib/enrichment/src/get_enrichment_table_record.rs @@ -124,7 +124,7 @@ impl Function for GetEnrichmentTableRecord { #[derive(Debug, Clone)] pub struct GetEnrichmentTableRecordFn { table: String, - condition: BTreeMap, + condition: BTreeMap, index: Option, select: Option>, case_sensitive: Case, diff --git a/lib/enrichment/src/lib.rs b/lib/enrichment/src/lib.rs index 625c4780900a5..52f2a547f50f1 100644 --- a/lib/enrichment/src/lib.rs +++ b/lib/enrichment/src/lib.rs @@ -7,12 +7,11 @@ pub mod tables; #[cfg(test)] mod test_util; mod vrl_util; -use std::collections::BTreeMap; use dyn_clone::DynClone; pub use tables::{TableRegistry, TableSearch}; use vrl::compiler::Function; -use vrl::value::Value; +use vrl::value::{ObjectMap, Value}; #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct IndexHandle(pub usize); @@ -49,7 +48,7 @@ pub trait Table: DynClone { condition: &'a [Condition<'a>], select: Option<&[String]>, index: Option, - ) -> Result, String>; + ) -> Result; /// Search the enrichment table data with the given condition. /// All conditions must match (AND). @@ -60,7 +59,7 @@ pub trait Table: DynClone { condition: &'a [Condition<'a>], select: Option<&[String]>, index: Option, - ) -> Result>, String>; + ) -> Result, String>; /// Hints to the enrichment table what data is going to be searched to allow it to index the /// data in advance. diff --git a/lib/enrichment/src/tables.rs b/lib/enrichment/src/tables.rs index fb43c3a7d0205..6f494491686f8 100644 --- a/lib/enrichment/src/tables.rs +++ b/lib/enrichment/src/tables.rs @@ -30,12 +30,12 @@ //! can be searched. use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, sync::{Arc, Mutex}, }; use arc_swap::ArcSwap; -use vrl::value::Value; +use vrl::value::ObjectMap; use super::{Condition, IndexHandle, Table}; use crate::Case; @@ -205,7 +205,7 @@ impl TableSearch { condition: &'a [Condition<'a>], select: Option<&[String]>, index: Option, - ) -> Result, String> { + ) -> Result { let tables = self.0.load(); if let Some(ref tables) = **tables { match tables.get(table) { @@ -227,7 +227,7 @@ impl TableSearch { condition: &'a [Condition<'a>], select: Option<&[String]>, index: Option, - ) -> Result>, String> { + ) -> Result, String> { let tables = self.0.load(); if let Some(ref tables) = **tables { match tables.get(table) { @@ -357,7 +357,7 @@ mod tests { registry.finish_load(); assert_eq!( - Ok(BTreeMap::from([("field".into(), Value::from("result"))])), + Ok(ObjectMap::from([("field".into(), Value::from("result"))])), tables_search.find_table_row( "dummy1", Case::Sensitive, @@ -410,8 +410,8 @@ mod tests { // After we finish load there are no tables in the list assert!(registry.table_ids().is_empty()); - let mut new_data = BTreeMap::new(); - new_data.insert("thing".to_string(), Value::Null); + let mut new_data = ObjectMap::new(); + new_data.insert("thing".into(), Value::Null); let mut tables: TableMap = HashMap::new(); tables.insert( diff --git a/lib/enrichment/src/test_util.rs b/lib/enrichment/src/test_util.rs index cf634f2102696..1eed09e200ca6 100644 --- a/lib/enrichment/src/test_util.rs +++ b/lib/enrichment/src/test_util.rs @@ -1,15 +1,15 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, sync::{Arc, Mutex}, }; -use vrl::value::Value; +use vrl::value::{ObjectMap, Value}; use crate::{Case, Condition, IndexHandle, Table, TableRegistry}; #[derive(Debug, Clone)] pub(crate) struct DummyEnrichmentTable { - data: BTreeMap, + data: ObjectMap, indexes: Arc>>>, } @@ -20,12 +20,12 @@ impl DummyEnrichmentTable { pub(crate) fn new_with_index(indexes: Arc>>>) -> Self { Self { - data: BTreeMap::from([("field".to_string(), Value::from("result"))]), + data: ObjectMap::from([("field".into(), Value::from("result"))]), indexes, } } - pub(crate) fn new_with_data(data: BTreeMap) -> Self { + pub(crate) fn new_with_data(data: ObjectMap) -> Self { Self { data, indexes: Default::default(), @@ -40,7 +40,7 @@ impl Table for DummyEnrichmentTable { _condition: &[Condition], _select: Option<&[String]>, _index: Option, - ) -> Result, String> { + ) -> Result { Ok(self.data.clone()) } @@ -50,7 +50,7 @@ impl Table for DummyEnrichmentTable { _condition: &[Condition], _select: Option<&[String]>, _index: Option, - ) -> Result>, String> { + ) -> Result, String> { Ok(vec![self.data.clone()]) } diff --git a/lib/enrichment/src/vrl_util.rs b/lib/enrichment/src/vrl_util.rs index ccdebd3cdfbb1..61e7044c53c99 100644 --- a/lib/enrichment/src/vrl_util.rs +++ b/lib/enrichment/src/vrl_util.rs @@ -64,7 +64,7 @@ pub(crate) fn add_index( registry: &mut TableRegistry, tablename: &str, case: Case, - condition: &BTreeMap, + condition: &BTreeMap, ) -> std::result::Result { let fields = condition .iter() @@ -112,10 +112,8 @@ mod tests { #[test] fn add_indexes() { let mut registry = test_util::get_table_registry(); - let conditions = BTreeMap::from([( - "field".to_owned(), - expression::Literal::from("value").into(), - )]); + let conditions = + BTreeMap::from([("field".into(), expression::Literal::from("value").into())]); let index = add_index(&mut registry, "dummy1", Case::Insensitive, &conditions).unwrap(); assert_eq!(IndexHandle(0), index); diff --git a/lib/opentelemetry-proto/src/convert.rs b/lib/opentelemetry-proto/src/convert.rs index 875be35d4dd9c..d1f91798f44c6 100644 --- a/lib/opentelemetry-proto/src/convert.rs +++ b/lib/opentelemetry-proto/src/convert.rs @@ -2,12 +2,11 @@ use bytes::Bytes; use chrono::{DateTime, TimeZone, Utc}; use lookup::path; use ordered_float::NotNan; -use std::collections::BTreeMap; use vector_core::{ config::{log_schema, LegacyKey, LogNamespace}, event::{Event, LogEvent}, }; -use vrl::value::Value; +use vrl::value::{ObjectMap, Value}; use super::proto::{ common::v1::{any_value::Value as PBValue, KeyValue}, @@ -73,10 +72,14 @@ fn kv_list_into_value(arr: Vec) -> Value { Value::Object( arr.into_iter() .filter_map(|kv| { - kv.value - .map(|av| (kv.key, av.value.map(Into::into).unwrap_or(Value::Null))) + kv.value.map(|av| { + ( + kv.key.into(), + av.value.map(Into::into).unwrap_or(Value::Null), + ) + }) }) - .collect::>(), + .collect::(), ) } diff --git a/lib/vector-common/src/byte_size_of.rs b/lib/vector-common/src/byte_size_of.rs index 0b984ac41dfa3..b0de324465f86 100644 --- a/lib/vector-common/src/byte_size_of.rs +++ b/lib/vector-common/src/byte_size_of.rs @@ -7,7 +7,7 @@ use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Utc}; use serde_json::{value::RawValue, Value as JsonValue}; use smallvec::SmallVec; -use vrl::value::Value; +use vrl::value::{KeyString, Value}; pub trait ByteSizeOf { /// Returns the in-memory size of this type @@ -58,6 +58,12 @@ impl ByteSizeOf for String { } } +impl ByteSizeOf for KeyString { + fn allocated_bytes(&self) -> usize { + self.len() + } +} + impl<'a> ByteSizeOf for &'a str { fn allocated_bytes(&self) -> usize { 0 diff --git a/lib/vector-config/src/stdlib.rs b/lib/vector-config/src/stdlib.rs index 7958c42d62609..ad3ae47aac32d 100644 --- a/lib/vector-config/src/stdlib.rs +++ b/lib/vector-config/src/stdlib.rs @@ -14,6 +14,7 @@ use std::{ use indexmap::IndexMap; use serde_json::{Number, Value}; use vector_config_common::{attributes::CustomAttribute, constants, validation::Validation}; +use vrl::value::KeyString; use crate::{ num::ConfigurableNumber, @@ -113,6 +114,22 @@ impl ToValue for String { } } +impl Configurable for KeyString { + fn metadata() -> Metadata { + Metadata::with_transparent(true) + } + + fn generate_schema(_: &RefCell) -> Result { + Ok(generate_string_schema()) + } +} + +impl ToValue for KeyString { + fn to_value(&self) -> Value { + Value::String(self.clone().into()) + } +} + impl Configurable for char { fn metadata() -> Metadata { let mut metadata = Metadata::with_transparent(true); diff --git a/lib/vector-config/src/str.rs b/lib/vector-config/src/str.rs index d1f303158d4d4..223a6019f43ca 100644 --- a/lib/vector-config/src/str.rs +++ b/lib/vector-config/src/str.rs @@ -1,3 +1,5 @@ +use vrl::value::KeyString; + use crate::Configurable; /// A string-like type that can be represented in a Vector configuration. @@ -10,3 +12,5 @@ use crate::Configurable; pub trait ConfigurableString: Configurable + ToString {} impl ConfigurableString for String {} + +impl ConfigurableString for KeyString {} diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index 9bd5d5a64bb09..34e165a0f8e66 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -95,7 +95,7 @@ rand = "0.8.5" rand_distr = "0.4.3" tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt", "ansi", "registry"] } vector-common = { path = "../vector-common", default-features = false, features = ["test"] } -vrl = { version = "0.7.0", features = ["cli", "test", "test_framework", "arbitrary"] } +vrl.workspace = true [features] api = ["dep:async-graphql"] diff --git a/lib/vector-core/src/event/discriminant.rs b/lib/vector-core/src/event/discriminant.rs index 98ff1d996ba12..b1bd9589ebb81 100644 --- a/lib/vector-core/src/event/discriminant.rs +++ b/lib/vector-core/src/event/discriminant.rs @@ -1,9 +1,6 @@ -use std::{ - collections::BTreeMap, - hash::{Hash, Hasher}, -}; +use std::hash::{Hash, Hasher}; -use super::{LogEvent, Value}; +use super::{LogEvent, ObjectMap, Value}; // TODO: if we had `Value` implement `Eq` and `Hash`, the implementation here // would be much easier. The issue is with `f64` type. We should consider using @@ -98,7 +95,7 @@ fn array_eq(this: &[Value], other: &[Value]) -> bool { .all(|(first, second)| value_eq(first, second)) } -fn map_eq(this: &BTreeMap, other: &BTreeMap) -> bool { +fn map_eq(this: &ObjectMap, other: &ObjectMap) -> bool { if this.len() != other.len() { return false; } @@ -150,7 +147,7 @@ fn hash_array(hasher: &mut H, array: &[Value]) { } } -fn hash_map(hasher: &mut H, map: &BTreeMap) { +fn hash_map(hasher: &mut H, map: &ObjectMap) { for (key, val) in map { hasher.write(key.as_bytes()); hash_value(hasher, val); diff --git a/lib/vector-core/src/event/estimated_json_encoded_size_of.rs b/lib/vector-core/src/event/estimated_json_encoded_size_of.rs index 9bfa3bf50a8b7..2e848e7c84648 100644 --- a/lib/vector-core/src/event/estimated_json_encoded_size_of.rs +++ b/lib/vector-core/src/event/estimated_json_encoded_size_of.rs @@ -5,7 +5,7 @@ use chrono::{DateTime, Timelike, Utc}; use ordered_float::NotNan; use smallvec::SmallVec; use vector_common::json_size::JsonSize; -use vrl::value::Value; +use vrl::value::{KeyString, Value}; const NULL_SIZE: JsonSize = JsonSize::new(4); const TRUE_SIZE: JsonSize = JsonSize::new(4); @@ -100,6 +100,12 @@ impl EstimatedJsonEncodedSizeOf for String { } } +impl EstimatedJsonEncodedSizeOf for KeyString { + fn estimated_json_encoded_size_of(&self) -> JsonSize { + self.as_str().estimated_json_encoded_size_of() + } +} + impl EstimatedJsonEncodedSizeOf for Bytes { fn estimated_json_encoded_size_of(&self) -> JsonSize { JsonSize::new(QUOTES_SIZE + self.len()) diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index f27f2b7af64fb..d00a55fb68339 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use chrono::Utc; use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, convert::{TryFrom, TryInto}, fmt::Debug, iter::FromIterator, @@ -12,7 +12,8 @@ use std::{ use crossbeam_utils::atomic::AtomicCell; use lookup::lookup_v2::TargetPath; -use lookup::PathPrefix; +use lookup::{metadata_path, path, PathPrefix}; +use once_cell::sync::Lazy; use serde::{Deserialize, Serialize, Serializer}; use vector_common::{ internal_event::{OptionalTag, TaggedEventsSent}, @@ -21,20 +22,18 @@ use vector_common::{ EventDataEq, }; use vrl::path::{parse_target_path, OwnedTargetPath, PathParseError}; +use vrl::{event_path, owned_value_path}; use super::{ estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf, finalization::{BatchNotifier, EventFinalizer}, metadata::EventMetadata, - util, EventFinalizers, Finalizable, Value, + util, EventFinalizers, Finalizable, KeyString, ObjectMap, Value, }; use crate::config::LogNamespace; use crate::config::{log_schema, telemetry}; use crate::event::util::log::{all_fields, all_metadata_fields}; use crate::{event::MaybeAsLogMut, ByteSizeOf}; -use lookup::{metadata_path, path}; -use once_cell::sync::Lazy; -use vrl::{event_path, owned_value_path}; static VECTOR_SOURCE_TYPE_PATH: Lazy> = Lazy::new(|| { Some(OwnedTargetPath::metadata(owned_value_path!( @@ -263,8 +262,8 @@ impl LogEvent { } } - /// Create a `LogEvent` from a `BTreeMap` and `EventMetadata` - pub fn from_map(map: BTreeMap, metadata: EventMetadata) -> Self { + /// Create a `LogEvent` from an `ObjectMap` and `EventMetadata` + pub fn from_map(map: ObjectMap, metadata: EventMetadata) -> Self { let inner = Arc::new(Inner::from(Value::Object(map))); Self { inner, metadata } } @@ -414,7 +413,7 @@ impl LogEvent { } } - pub fn keys(&self) -> Option + '_> { + pub fn keys(&self) -> Option + '_> { match &self.inner.fields { Value::Object(map) => Some(util::log::keys(map)), _ => None, @@ -423,7 +422,9 @@ impl LogEvent { /// If the event root value is a map, build and return an iterator to event field and value pairs. /// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods. - pub fn all_event_fields(&self) -> Option + Serialize> { + pub fn all_event_fields( + &self, + ) -> Option + Serialize> { self.as_map().map(all_fields) } @@ -431,7 +432,7 @@ impl LogEvent { /// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods. pub fn all_metadata_fields( &self, - ) -> Option + Serialize> { + ) -> Option + Serialize> { match self.metadata.value() { Value::Object(metadata_map) => Some(metadata_map).map(all_metadata_fields), _ => None, @@ -440,7 +441,7 @@ impl LogEvent { /// Returns an iterator of all fields if the value is an Object. Otherwise, /// a single field is returned with a "message" key - pub fn convert_to_fields(&self) -> impl Iterator + Serialize { + pub fn convert_to_fields(&self) -> impl Iterator + Serialize { if let Some(map) = self.as_map() { util::log::all_fields(map) } else { @@ -456,14 +457,14 @@ impl LogEvent { } } - pub fn as_map(&self) -> Option<&BTreeMap> { + pub fn as_map(&self) -> Option<&ObjectMap> { match self.value() { Value::Object(map) => Some(map), _ => None, } } - pub fn as_map_mut(&mut self) -> Option<&mut BTreeMap> { + pub fn as_map_mut(&mut self) -> Option<&mut ObjectMap> { match self.value_mut() { Value::Object(map) => Some(map), _ => None, @@ -633,16 +634,16 @@ impl From for LogEvent { } } -impl From> for LogEvent { - fn from(map: BTreeMap) -> Self { +impl From for LogEvent { + fn from(map: ObjectMap) -> Self { Self::from_parts(Value::Object(map), EventMetadata::default()) } } -impl From> for LogEvent { - fn from(map: HashMap) -> Self { +impl From> for LogEvent { + fn from(map: HashMap) -> Self { Self::from_parts( - Value::Object(map.into_iter().collect::>()), + Value::Object(map.into_iter().collect::()), EventMetadata::default(), ) } @@ -656,8 +657,8 @@ impl TryFrom for LogEvent { serde_json::Value::Object(fields) => Ok(LogEvent::from( fields .into_iter() - .map(|(k, v)| (k, v.into())) - .collect::>(), + .map(|(k, v)| (k.into(), v.into())) + .collect::(), )), _ => Err(crate::Error::from( "Attempted to convert non-Object JSON into a LogEvent.", @@ -1112,14 +1113,14 @@ mod test { log.insert("a", 0); log.insert("a.b", 1); log.insert("c", 2); - let actual: Vec<(String, Value)> = log + let actual: Vec<(KeyString, Value)> = log .all_event_fields() .unwrap() .map(|(s, v)| (s, v.clone())) .collect(); assert_eq!( actual, - vec![("a.b".to_string(), 1.into()), ("c".to_string(), 2.into())] + vec![("a.b".into(), 1.into()), ("c".into(), 2.into())] ); } @@ -1129,14 +1130,14 @@ mod test { log.insert("%a", 0); log.insert("%a.b", 1); log.insert("%c", 2); - let actual: Vec<(String, Value)> = log + let actual: Vec<(KeyString, Value)> = log .all_metadata_fields() .unwrap() .map(|(s, v)| (s, v.clone())) .collect(); assert_eq!( actual, - vec![("%a.b".to_string(), 1.into()), ("%c".to_string(), 2.into())] + vec![("%a.b".into(), 1.into()), ("%c".into(), 2.into())] ); } } diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index a592a328a9c8d..ad5588d083c2b 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -1,12 +1,12 @@ #![deny(missing_docs)] -use std::{collections::BTreeMap, sync::Arc}; +use std::sync::Arc; use serde::{Deserialize, Serialize}; use vector_common::{config::ComponentKey, EventDataEq}; -use vrl::value::{Kind, Secrets, Value}; +use vrl::value::{KeyString, Kind, Secrets, Value}; -use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus}; +use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, ObjectMap}; use crate::{ config::{LogNamespace, OutputId}, schema, ByteSizeOf, @@ -56,7 +56,7 @@ pub struct EventMetadata { /// we need to ensure it is still available later on for emitting metrics tagged by the service. /// This field could almost be keyed by `&'static str`, but because it needs to be deserializable /// we have to use `String`. - dropped_fields: BTreeMap, + dropped_fields: ObjectMap, /// Metadata to track the origin of metrics. This is always `None` for log and trace events. /// Only a small set of Vector sources and transforms explicitly set this field. @@ -114,7 +114,7 @@ impl DatadogMetricOriginMetadata { } fn default_metadata_value() -> Value { - Value::Object(BTreeMap::new()) + Value::Object(ObjectMap::new()) } impl EventMetadata { @@ -204,7 +204,7 @@ impl EventMetadata { /// There is currently no way to remove a field from this list, so if a field is dropped /// and then the field is re-added with a new value - the dropped value will still be /// retrieved. - pub fn add_dropped_field(&mut self, meaning: String, value: Value) { + pub fn add_dropped_field(&mut self, meaning: KeyString, value: Value) { self.dropped_fields.insert(meaning, value); } @@ -222,14 +222,14 @@ impl EventMetadata { impl Default for EventMetadata { fn default() -> Self { Self { - value: Value::Object(BTreeMap::new()), + value: Value::Object(ObjectMap::new()), secrets: Secrets::new(), finalizers: Default::default(), schema_definition: default_schema_definition(), source_id: None, source_type: None, upstream_id: None, - dropped_fields: BTreeMap::new(), + dropped_fields: ObjectMap::new(), datadog_origin_metadata: None, } } diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index c96198f27d1bb..ce459577948a0 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, convert::TryInto, fmt::Debug, sync::Arc}; +use std::{convert::TryInto, fmt::Debug, sync::Arc}; use crate::config::LogNamespace; use crate::{config::OutputId, ByteSizeOf}; @@ -19,7 +19,7 @@ use vector_common::{ config::ComponentKey, finalization, internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags, EventDataEq, }; -pub use vrl::value::Value; +pub use vrl::value::{KeyString, ObjectMap, Value}; #[cfg(feature = "vrl")] pub use vrl_target::{TargetEvents, VrlTarget}; @@ -340,8 +340,8 @@ impl Event { serde_json::Value::Object(fields) => Ok(LogEvent::from( fields .into_iter() - .map(|(k, v)| (k, v.into())) - .collect::>(), + .map(|(k, v)| (k.into(), v.into())) + .collect::(), ) .into()), _ => Err(crate::Error::from( diff --git a/lib/vector-core/src/event/proto.rs b/lib/vector-core/src/event/proto.rs index 7dfb59c32d5b5..c77f29bed2458 100644 --- a/lib/vector-core/src/event/proto.rs +++ b/lib/vector-core/src/event/proto.rs @@ -1,8 +1,10 @@ +use std::collections::BTreeMap; + use chrono::TimeZone; use ordered_float::NotNan; use crate::{ - event::{self, BTreeMap, MetricTags, WithMetadata}, + event::{self, MetricTags, ObjectMap, WithMetadata}, metrics::AgentDDSketch, }; @@ -98,8 +100,8 @@ impl From for event::LogEvent { let fields = log .fields .into_iter() - .filter_map(|(k, v)| decode_value(v).map(|value| (k, value))) - .collect::>(); + .filter_map(|(k, v)| decode_value(v).map(|value| (k.into(), value))) + .collect::(); Self::from(fields) }; @@ -118,8 +120,8 @@ impl From for event::TraceEvent { let fields = trace .fields .into_iter() - .filter_map(|(k, v)| decode_value(v).map(|value| (k, value))) - .collect::>(); + .filter_map(|(k, v)| decode_value(v).map(|value| (k.into(), value))) + .collect::(); let mut log = event::LogEvent::from(fields); if let Some(metadata_value) = trace.metadata { @@ -305,8 +307,8 @@ impl From for WithMetadata { Log { fields: fields .into_iter() - .map(|(k, v)| (k, encode_value(v))) - .collect::>(), + .map(|(k, v)| (k.into(), encode_value(v))) + .collect::>(), value: None, metadata: Some(encode_value(metadata.value().clone())), } @@ -331,7 +333,7 @@ impl From for WithMetadata { let (fields, metadata) = trace.into_parts(); let fields = fields .into_iter() - .map(|(k, v)| (k, encode_value(v))) + .map(|(k, v)| (k.into(), encode_value(v))) .collect::>(); let data = Trace { @@ -613,8 +615,8 @@ fn decode_value(input: Value) -> Option { fn decode_map(fields: BTreeMap) -> Option { fields .into_iter() - .map(|(key, value)| decode_value(value).map(|value| (key, value))) - .collect::>>() + .map(|(key, value)| decode_value(value).map(|value| (key.into(), value))) + .collect::>() .map(event::Value::Object) } @@ -660,11 +662,11 @@ fn encode_value(value: event::Value) -> Value { } } -fn encode_map(fields: BTreeMap) -> ValueMap { +fn encode_map(fields: ObjectMap) -> ValueMap { ValueMap { fields: fields .into_iter() - .map(|(key, value)| (key, encode_value(value))) + .map(|(key, value)| (key.into(), encode_value(value))) .collect(), } } diff --git a/lib/vector-core/src/event/test/common.rs b/lib/vector-core/src/event/test/common.rs index c7ccca9521f70..575d0c4853fcc 100644 --- a/lib/vector-core/src/event/test/common.rs +++ b/lib/vector-core/src/event/test/common.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - iter, -}; +use std::{collections::BTreeSet, iter}; use chrono::{DateTime, NaiveDateTime, Utc}; use quickcheck::{empty_shrinker, Arbitrary, Gen}; @@ -12,8 +9,8 @@ use crate::{ Bucket, MetricData, MetricName, MetricSeries, MetricSketch, MetricTags, MetricTime, Quantile, Sample, }, - Event, EventMetadata, LogEvent, Metric, MetricKind, MetricValue, StatisticKind, TraceEvent, - Value, + Event, EventMetadata, LogEvent, Metric, MetricKind, MetricValue, ObjectMap, StatisticKind, + TraceEvent, Value, }, metrics::AgentDDSketch, }; @@ -84,7 +81,7 @@ impl Arbitrary for Event { impl Arbitrary for LogEvent { fn arbitrary(g: &mut Gen) -> Self { let mut gen = Gen::new(MAX_MAP_SIZE); - let map: BTreeMap = BTreeMap::arbitrary(&mut gen); + let map: ObjectMap = ObjectMap::arbitrary(&mut gen); let metadata: EventMetadata = EventMetadata::arbitrary(g); LogEvent::from_map(map, metadata) } diff --git a/lib/vector-core/src/event/test/mod.rs b/lib/vector-core/src/event/test/mod.rs index d1dda8523f6ab..d881ff0c4bd17 100644 --- a/lib/vector-core/src/event/test/mod.rs +++ b/lib/vector-core/src/event/test/mod.rs @@ -43,9 +43,9 @@ fn event_iteration_order() { assert_eq!( collected, vec![ - (String::from("YRjhxXcg"), &Value::from("nw8iM5Jr")), - (String::from("lZDfzKIL"), &Value::from("tOVrjveM")), - (String::from("o9amkaRY"), &Value::from("pGsfG7Nr")), + ("YRjhxXcg", &Value::from("nw8iM5Jr")), + ("lZDfzKIL", &Value::from("tOVrjveM")), + ("o9amkaRY", &Value::from("pGsfG7Nr")), ] ); } diff --git a/lib/vector-core/src/event/trace.rs b/lib/vector-core/src/event/trace.rs index 120c6c55f490b..5dcee18028221 100644 --- a/lib/vector-core/src/event/trace.rs +++ b/lib/vector-core/src/event/trace.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, fmt::Debug}; +use std::fmt::Debug; use lookup::lookup_v2::TargetPath; use serde::{Deserialize, Serialize}; @@ -11,7 +11,7 @@ use vrl::path::PathParseError; use super::{ BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata, - Finalizable, LogEvent, Value, + Finalizable, LogEvent, ObjectMap, Value, }; use crate::ByteSizeOf; @@ -24,13 +24,13 @@ impl TraceEvent { /// # Panics /// /// Panics if the fields of the `TraceEvent` are not a `Value::Map`. - pub fn into_parts(self) -> (BTreeMap, EventMetadata) { + pub fn into_parts(self) -> (ObjectMap, EventMetadata) { let (value, metadata) = self.0.into_parts(); let map = value.into_object().expect("inner value must be a map"); (map, metadata) } - pub fn from_parts(fields: BTreeMap, metadata: EventMetadata) -> Self { + pub fn from_parts(fields: ObjectMap, metadata: EventMetadata) -> Self { Self(LogEvent::from_map(fields, metadata)) } @@ -64,11 +64,11 @@ impl TraceEvent { Self(self.0.with_batch_notifier_option(batch)) } - /// Convert a `TraceEvent` into a `BTreeMap` of it's fields + /// Convert a `TraceEvent` into an `ObjectMap` of it's fields /// # Panics /// /// Panics if the fields of the `TraceEvent` are not a `Value::Map`. - pub fn as_map(&self) -> &BTreeMap { + pub fn as_map(&self) -> &ObjectMap { self.0.as_map().expect("inner value must be a map") } @@ -121,8 +121,8 @@ impl From for TraceEvent { } } -impl From> for TraceEvent { - fn from(map: BTreeMap) -> Self { +impl From for TraceEvent { + fn from(map: ObjectMap) -> Self { Self(map.into()) } } diff --git a/lib/vector-core/src/event/util/log/all_fields.rs b/lib/vector-core/src/event/util/log/all_fields.rs index 9cc9cc2043b31..7c69aeadf71a6 100644 --- a/lib/vector-core/src/event/util/log/all_fields.rs +++ b/lib/vector-core/src/event/util/log/all_fields.rs @@ -1,23 +1,19 @@ -use std::{ - collections::{btree_map, BTreeMap}, - fmt::Write as _, - iter, slice, -}; +use std::{collections::btree_map, fmt::Write as _, iter, slice}; use serde::{Serialize, Serializer}; use vrl::path::PathPrefix; -use super::Value; +use crate::event::{KeyString, ObjectMap, Value}; /// Iterates over all paths in form `a.b[0].c[1]` in alphabetical order /// and their corresponding values. -pub fn all_fields(fields: &BTreeMap) -> FieldsIter { +pub fn all_fields(fields: &ObjectMap) -> FieldsIter { FieldsIter::new(fields) } /// Same functionality as `all_fields` but it prepends a character that denotes the /// path type. -pub fn all_metadata_fields(fields: &BTreeMap) -> FieldsIter { +pub fn all_metadata_fields(fields: &ObjectMap) -> FieldsIter { FieldsIter::new_with_prefix(PathPrefix::Metadata, fields) } @@ -29,13 +25,13 @@ pub fn all_fields_non_object_root(value: &Value) -> FieldsIter { #[derive(Clone, Debug)] enum LeafIter<'a> { Root((&'a Value, bool)), - Map(btree_map::Iter<'a, String, Value>), + Map(btree_map::Iter<'a, KeyString, Value>), Array(iter::Enumerate>), } #[derive(Clone, Copy)] enum PathComponent<'a> { - Key(&'a String), + Key(&'a KeyString), Index(usize), } @@ -54,7 +50,7 @@ pub struct FieldsIter<'a> { impl<'a> FieldsIter<'a> { // TODO deprecate this in favor of `new_with_prefix`. - fn new(fields: &'a BTreeMap) -> FieldsIter<'a> { + fn new(fields: &'a ObjectMap) -> FieldsIter<'a> { FieldsIter { path_prefix: None, stack: vec![LeafIter::Map(fields.iter())], @@ -62,10 +58,7 @@ impl<'a> FieldsIter<'a> { } } - fn new_with_prefix( - path_prefix: PathPrefix, - fields: &'a BTreeMap, - ) -> FieldsIter<'a> { + fn new_with_prefix(path_prefix: PathPrefix, fields: &'a ObjectMap) -> FieldsIter<'a> { FieldsIter { path_prefix: Some(path_prefix), stack: vec![LeafIter::Map(fields.iter())], @@ -104,7 +97,7 @@ impl<'a> FieldsIter<'a> { self.path.pop(); } - fn make_path(&mut self, component: PathComponent<'a>) -> String { + fn make_path(&mut self, component: PathComponent<'a>) -> KeyString { let mut res = match self.path_prefix { None => String::new(), Some(prefix) => match prefix { @@ -115,7 +108,7 @@ impl<'a> FieldsIter<'a> { let mut path_iter = self.path.iter().chain(iter::once(&component)).peekable(); loop { match path_iter.next() { - None => return res, + None => break res.into(), Some(PathComponent::Key(key)) => { if key.contains('.') { res.push_str(&key.replace('.', "\\.")); @@ -135,7 +128,7 @@ impl<'a> FieldsIter<'a> { } impl<'a> Iterator for FieldsIter<'a> { - type Item = (String, &'a Value); + type Item = (KeyString, &'a Value); fn next(&mut self) -> Option { loop { @@ -161,9 +154,9 @@ impl<'a> Iterator for FieldsIter<'a> { } }, Some(LeafIter::Root((value, visited))) => { - let result = (!*visited).then(|| ("message".to_owned(), *value)); + let result = (!*visited).then(|| ("message".into(), *value)); *visited = true; - return result; + break result; } }; } @@ -250,7 +243,7 @@ mod test { ("a.array[3][0]", Value::Integer(2)), ("a.b.c", Value::Integer(5)), ("a\\.b\\.c", Value::Integer(6)), - ("d", Value::Object(BTreeMap::new())), + ("d", Value::Object(ObjectMap::new())), ("e", Value::Array(Vec::new())), ] .into_iter() diff --git a/lib/vector-core/src/event/util/log/keys.rs b/lib/vector-core/src/event/util/log/keys.rs index 9139d6a3c3b59..b958c989d3e35 100644 --- a/lib/vector-core/src/event/util/log/keys.rs +++ b/lib/vector-core/src/event/util/log/keys.rs @@ -1,11 +1,10 @@ -use std::collections::BTreeMap; - -use super::{all_fields, Value}; +use super::all_fields; +use crate::event::{KeyString, ObjectMap}; /// Iterates over all paths in form `a.b[0].c[1]` in alphabetical order. /// It is implemented as a wrapper around `all_fields` to reduce code /// duplication. -pub fn keys(fields: &BTreeMap) -> impl Iterator + '_ { +pub fn keys(fields: &ObjectMap) -> impl Iterator + '_ { all_fields(fields).map(|(k, _)| k) } diff --git a/lib/vector-core/src/event/util/log/mod.rs b/lib/vector-core/src/event/util/log/mod.rs index dc619db32c09d..b331397bcc774 100644 --- a/lib/vector-core/src/event/util/log/mod.rs +++ b/lib/vector-core/src/event/util/log/mod.rs @@ -4,17 +4,13 @@ mod keys; pub use all_fields::{all_fields, all_fields_non_object_root, all_metadata_fields}; pub use keys::keys; -use super::Value; - #[cfg(test)] mod test { - use std::collections::BTreeMap; - use serde_json::Value as JsonValue; - use super::Value; + use crate::event::{ObjectMap, Value}; - pub(crate) fn fields_from_json(json_value: JsonValue) -> BTreeMap { + pub(crate) fn fields_from_json(json_value: JsonValue) -> ObjectMap { match Value::from(json_value) { Value::Object(map) => map, something => panic!("Expected a map, got {something:?}"), diff --git a/lib/vector-core/src/event/util/mod.rs b/lib/vector-core/src/event/util/mod.rs index 952cbfe714256..f4ee9bc8d0d45 100644 --- a/lib/vector-core/src/event/util/mod.rs +++ b/lib/vector-core/src/event/util/mod.rs @@ -1,3 +1 @@ pub mod log; - -use super::Value; diff --git a/lib/vector-core/src/event/vrl_target.rs b/lib/vector-core/src/event/vrl_target.rs index b29439a9a1e43..79801b5eaf041 100644 --- a/lib/vector-core/src/event/vrl_target.rs +++ b/lib/vector-core/src/event/vrl_target.rs @@ -7,7 +7,7 @@ use snafu::Snafu; use vrl::compiler::value::VrlValueConvert; use vrl::compiler::{ProgramInfo, SecretTarget, Target}; use vrl::prelude::Collection; -use vrl::value::{Kind, Value}; +use vrl::value::{Kind, ObjectMap, Value}; use super::{Event, EventMetadata, LogEvent, Metric, MetricKind, TraceEvent}; use crate::config::{log_schema, LogNamespace}; @@ -294,7 +294,7 @@ impl Target for VrlTarget { metric.remove_tags(); for (field, value) in &value { set_metric_tag_values( - field.as_str().to_owned(), + field[..].into(), value, metric, *multi_value_tags, @@ -539,7 +539,7 @@ fn target_get_mut_metric<'a>( /// This structure is partially populated based on the fields accessed by /// the VRL program as informed by `ProgramInfo`. fn precompute_metric_value(metric: &Metric, info: &ProgramInfo) -> Value { - let mut map = BTreeMap::default(); + let mut map = ObjectMap::default(); let mut set_name = false; let mut set_kind = false; @@ -552,36 +552,36 @@ fn precompute_metric_value(metric: &Metric, info: &ProgramInfo) -> Value { // Accessing a root path requires us to pre-populate all fields. if target_path == &OwnedTargetPath::event_root() { if !set_name { - map.insert("name".to_owned(), metric.name().to_owned().into()); + map.insert("name".into(), metric.name().to_owned().into()); } if !set_kind { - map.insert("kind".to_owned(), metric.kind().into()); + map.insert("kind".into(), metric.kind().into()); } if !set_type { - map.insert("type".to_owned(), metric.value().clone().into()); + map.insert("type".into(), metric.value().clone().into()); } if !set_namespace { if let Some(namespace) = metric.namespace() { - map.insert("namespace".to_owned(), namespace.to_owned().into()); + map.insert("namespace".into(), namespace.to_owned().into()); } } if !set_timestamp { if let Some(timestamp) = metric.timestamp() { - map.insert("timestamp".to_owned(), timestamp.into()); + map.insert("timestamp".into(), timestamp.into()); } } if !set_tags { if let Some(tags) = metric.tags().cloned() { map.insert( - "tags".to_owned(), + "tags".into(), tags.into_iter_single() - .map(|(tag, value)| (tag, value.into())) - .collect::>() + .map(|(tag, value)| (tag.into(), value.into())) + .collect::() .into(), ); } @@ -596,38 +596,38 @@ fn precompute_metric_value(metric: &Metric, info: &ProgramInfo) -> Value { match field.as_ref() { "name" if !set_name => { set_name = true; - map.insert("name".to_owned(), metric.name().to_owned().into()); + map.insert("name".into(), metric.name().to_owned().into()); } "kind" if !set_kind => { set_kind = true; - map.insert("kind".to_owned(), metric.kind().into()); + map.insert("kind".into(), metric.kind().into()); } "type" if !set_type => { set_type = true; - map.insert("type".to_owned(), metric.value().clone().into()); + map.insert("type".into(), metric.value().clone().into()); } "namespace" if !set_namespace && metric.namespace().is_some() => { set_namespace = true; map.insert( - "namespace".to_owned(), + "namespace".into(), metric.namespace().unwrap().to_owned().into(), ); } "timestamp" if !set_timestamp && metric.timestamp().is_some() => { set_timestamp = true; - map.insert("timestamp".to_owned(), metric.timestamp().unwrap().into()); + map.insert("timestamp".into(), metric.timestamp().unwrap().into()); } "tags" if !set_tags && metric.tags().is_some() => { set_tags = true; map.insert( - "tags".to_owned(), + "tags".into(), metric .tags() .cloned() .unwrap() .into_iter_single() - .map(|(tag, value)| (tag, value.into())) - .collect::>() + .map(|(tag, value)| (tag.into(), value.into())) + .collect::() .into(), ); } @@ -791,7 +791,7 @@ mod test { ]; for (value, path, expect) in cases { - let value: BTreeMap = value; + let value: ObjectMap = value; let info = ProgramInfo { fallible: false, abortable: false, @@ -895,7 +895,7 @@ mod test { ]; for (object, path, value, expect, result) in cases { - let object: BTreeMap = object; + let object: ObjectMap = object; let info = ProgramInfo { fallible: false, abortable: false, diff --git a/lib/vector-lookup/src/lookup_v2/mod.rs b/lib/vector-lookup/src/lookup_v2/mod.rs index de95977844d62..b92acd7916c0c 100644 --- a/lib/vector-lookup/src/lookup_v2/mod.rs +++ b/lib/vector-lookup/src/lookup_v2/mod.rs @@ -7,6 +7,7 @@ pub use vrl::path::{ parse_target_path, parse_value_path, BorrowedSegment, OwnedSegment, OwnedTargetPath, OwnedValuePath, PathConcat, PathParseError, PathPrefix, TargetPath, ValuePath, }; +use vrl::value::KeyString; /// A wrapper around `OwnedValuePath` that allows it to be used in Vector config. /// This requires a valid path to be used. If you want to allow optional paths, @@ -24,6 +25,14 @@ impl TryFrom for ConfigValuePath { } } +impl TryFrom for ConfigValuePath { + type Error = PathParseError; + + fn try_from(src: KeyString) -> Result { + OwnedValuePath::try_from(String::from(src)).map(ConfigValuePath) + } +} + impl From for String { fn from(owned: ConfigValuePath) -> Self { String::from(owned.0) @@ -60,6 +69,14 @@ impl TryFrom for ConfigTargetPath { } } +impl TryFrom for ConfigTargetPath { + type Error = PathParseError; + + fn try_from(src: KeyString) -> Result { + OwnedTargetPath::try_from(src).map(ConfigTargetPath) + } +} + impl From for String { fn from(owned: ConfigTargetPath) -> Self { String::from(owned.0) diff --git a/lib/vector-vrl/tests/src/test_enrichment.rs b/lib/vector-vrl/tests/src/test_enrichment.rs index 71ee66d87ff20..725c4cd4a2ab6 100644 --- a/lib/vector-vrl/tests/src/test_enrichment.rs +++ b/lib/vector-vrl/tests/src/test_enrichment.rs @@ -1,5 +1,5 @@ -use std::collections::{BTreeMap, HashMap}; -use vrl::value::Value; +use std::collections::HashMap; +use vrl::value::{ObjectMap, Value}; #[derive(Debug, Clone)] struct TestEnrichmentTable; @@ -11,11 +11,11 @@ impl enrichment::Table for TestEnrichmentTable { _condition: &'a [enrichment::Condition<'a>], _select: Option<&[String]>, _index: Option, - ) -> Result, String> { - let mut result = BTreeMap::new(); - result.insert("id".to_string(), Value::from(1)); - result.insert("firstname".to_string(), Value::from("Bob")); - result.insert("surname".to_string(), Value::from("Smith")); + ) -> Result { + let mut result = ObjectMap::new(); + result.insert("id".into(), Value::from(1)); + result.insert("firstname".into(), Value::from("Bob")); + result.insert("surname".into(), Value::from("Smith")); Ok(result) } @@ -26,16 +26,16 @@ impl enrichment::Table for TestEnrichmentTable { _condition: &'a [enrichment::Condition<'a>], _select: Option<&[String]>, _index: Option, - ) -> Result>, String> { - let mut result1 = BTreeMap::new(); - result1.insert("id".to_string(), Value::from(1)); - result1.insert("firstname".to_string(), Value::from("Bob")); - result1.insert("surname".to_string(), Value::from("Smith")); + ) -> Result, String> { + let mut result1 = ObjectMap::new(); + result1.insert("id".into(), Value::from(1)); + result1.insert("firstname".into(), Value::from("Bob")); + result1.insert("surname".into(), Value::from("Smith")); - let mut result2 = BTreeMap::new(); - result2.insert("id".to_string(), Value::from(2)); - result2.insert("firstname".to_string(), Value::from("Fred")); - result2.insert("surname".to_string(), Value::from("Smith")); + let mut result2 = ObjectMap::new(); + result2.insert("id".into(), Value::from(2)); + result2.insert("firstname".into(), Value::from("Fred")); + result2.insert("surname".into(), Value::from("Smith")); Ok(vec![result1, result2]) } @@ -61,7 +61,7 @@ impl enrichment::Table for TestEnrichmentTable { pub(crate) fn test_enrichment_table() -> enrichment::TableRegistry { let registry = enrichment::TableRegistry::default(); let mut tables: HashMap> = HashMap::new(); - tables.insert("test".to_string(), Box::new(TestEnrichmentTable)); + tables.insert("test".into(), Box::new(TestEnrichmentTable)); registry.load(tables); registry diff --git a/lib/vector-vrl/web-playground/build.rs b/lib/vector-vrl/web-playground/build.rs index c12e6767bfdbd..f6d42756f1057 100644 --- a/lib/vector-vrl/web-playground/build.rs +++ b/lib/vector-vrl/web-playground/build.rs @@ -53,7 +53,7 @@ fn write_build_constants(manifest: &Manifest, dest_path: &Path) -> io::Result<() .unwrap() .version .clone() - .unwrap(); + .unwrap_or_else(|| "FIXME".into()); let vrl_version_const = create_const_statement("VRL_VERSION", vrl_version); output_file .write_all(vrl_version_const.as_bytes()) diff --git a/src/api/schema/events/metric.rs b/src/api/schema/events/metric.rs index 23975758b3a5d..c750c2aa1bb85 100644 --- a/src/api/schema/events/metric.rs +++ b/src/api/schema/events/metric.rs @@ -1,5 +1,3 @@ -use std::collections::BTreeMap; - use async_graphql::{Enum, Object}; use chrono::{DateTime, Utc}; use serde_json::Value; @@ -7,7 +5,7 @@ use vector_common::encode_logfmt; use super::EventEncodingType; use crate::{ - event::{self}, + event::{self, KeyString}, topology::TapOutput, }; @@ -130,7 +128,7 @@ impl Metric { .expect("logfmt serialization of metric event failed: conversion to serde Value failed. Please report."); match json { Value::Object(map) => encode_logfmt::encode_map( - &map.into_iter().collect::>(), + &map.into_iter().map(|(k,v)| (KeyString::from(k), v)).collect(), ) .expect("logfmt serialization of metric event failed. Please report."), _ => panic!("logfmt serialization of metric event failed: metric converted to unexpected serde Value. Please report."), diff --git a/src/codecs/encoding/transformer.rs b/src/codecs/encoding/transformer.rs index bd5aee8b7f0bc..aa955d4ae113c 100644 --- a/src/codecs/encoding/transformer.rs +++ b/src/codecs/encoding/transformer.rs @@ -148,7 +148,7 @@ impl Transformer { let mut new_log = LogEvent::from(old_value); if let Some(service) = new_log.remove(service_path) { log.metadata_mut() - .add_dropped_field(meaning::SERVICE.to_string(), service); + .add_dropped_field(meaning::SERVICE.into(), service); } } } @@ -169,7 +169,7 @@ impl Transformer { if let (Some(v), Some(service_path)) = (value, service_path) { if service_path.path == *value_path { log.metadata_mut() - .add_dropped_field(meaning::SERVICE.to_string(), v); + .add_dropped_field(meaning::SERVICE.into(), v); } } } diff --git a/src/enrichment_tables/file.rs b/src/enrichment_tables/file.rs index 6c92bf395f244..ea9a534396c20 100644 --- a/src/enrichment_tables/file.rs +++ b/src/enrichment_tables/file.rs @@ -1,18 +1,12 @@ //! Handles enrichment tables for `type = file`. -use std::{ - collections::{BTreeMap, HashMap}, - fs, - hash::Hasher, - path::PathBuf, - time::SystemTime, -}; +use std::{collections::HashMap, fs, hash::Hasher, path::PathBuf, time::SystemTime}; use bytes::Bytes; use enrichment::{Case, Condition, IndexHandle, Table}; use tracing::trace; use vector_common::{conversion::Conversion, TimeZone}; use vector_config::configurable_component; -use vrl::value::Value; +use vrl::value::{ObjectMap, Value}; use crate::config::EnrichmentTableConfig; @@ -311,7 +305,7 @@ impl File { }) } - fn add_columns(&self, select: Option<&[String]>, row: &[Value]) -> BTreeMap { + fn add_columns(&self, select: Option<&[String]>, row: &[Value]) -> ObjectMap { self.headers .iter() .zip(row) @@ -321,7 +315,7 @@ impl File { // If no select is passed, we assume all columns are included .unwrap_or(true) }) - .map(|(header, col)| (header.clone(), col.clone())) + .map(|(header, col)| (header.as_str().into(), col.clone())) .collect() } @@ -399,7 +393,7 @@ impl File { case: Case, condition: &'a [Condition<'a>], select: Option<&'a [String]>, - ) -> impl Iterator> + 'a + ) -> impl Iterator + 'a where I: Iterator> + 'a, { @@ -484,7 +478,7 @@ impl Table for File { condition: &'a [Condition<'a>], select: Option<&'a [String]>, index: Option, - ) -> Result, String> { + ) -> Result { match index { None => { // No index has been passed so we need to do a Sequential Scan. @@ -509,7 +503,7 @@ impl Table for File { condition: &'a [Condition<'a>], select: Option<&'a [String]>, index: Option, - ) -> Result>, String> { + ) -> Result, String> { match index { None => { // No index has been passed so we need to do a Sequential Scan. @@ -717,9 +711,9 @@ mod tests { }; assert_eq!( - Ok(BTreeMap::from([ - (String::from("field1"), Value::from("zirp")), - (String::from("field2"), Value::from("zurp")), + Ok(ObjectMap::from([ + ("field1".into(), Value::from("zirp")), + ("field2".into(), Value::from("zurp")), ])), file.find_table_row(Case::Sensitive, &[condition], None, None) ); @@ -785,9 +779,9 @@ mod tests { }; assert_eq!( - Ok(BTreeMap::from([ - (String::from("field1"), Value::from("zirp")), - (String::from("field2"), Value::from("zurp")), + Ok(ObjectMap::from([ + ("field1".into(), Value::from("zirp")), + ("field2".into(), Value::from("zurp")), ])), file.find_table_row(Case::Sensitive, &[condition], None, Some(handle)) ); @@ -810,13 +804,13 @@ mod tests { assert_eq!( Ok(vec![ - BTreeMap::from([ - (String::from("field1"), Value::from("zip")), - (String::from("field2"), Value::from("zup")), + ObjectMap::from([ + ("field1".into(), Value::from("zip")), + ("field2".into(), Value::from("zup")), ]), - BTreeMap::from([ - (String::from("field1"), Value::from("zip")), - (String::from("field2"), Value::from("zoop")), + ObjectMap::from([ + ("field1".into(), Value::from("zip")), + ("field2".into(), Value::from("zoop")), ]), ]), file.find_table_rows( diff --git a/src/enrichment_tables/geoip.rs b/src/enrichment_tables/geoip.rs index 3a46bf17d8fac..add7b3374814d 100644 --- a/src/enrichment_tables/geoip.rs +++ b/src/enrichment_tables/geoip.rs @@ -12,7 +12,7 @@ use maxminddb::{ MaxMindDBError, Reader, }; use vector_config::configurable_component; -use vrl::value::Value; +use vrl::value::{ObjectMap, Value}; use crate::config::{EnrichmentTableConfig, GenerateConfig}; @@ -132,14 +132,14 @@ impl Geoip { } } - fn lookup(&self, ip: IpAddr, select: Option<&[String]>) -> Option> { - let mut map = BTreeMap::new(); + fn lookup(&self, ip: IpAddr, select: Option<&[String]>) -> Option { + let mut map = ObjectMap::new(); let mut add_field = |key: &str, value: Option| { if select .map(|fields| fields.iter().any(|field| field == key)) .unwrap_or(true) { - map.insert(key.to_string(), value.unwrap_or(Value::Null)); + map.insert(key.into(), value.unwrap_or(Value::Null)); } }; @@ -234,7 +234,7 @@ impl Table for Geoip { condition: &'a [Condition<'a>], select: Option<&[String]>, index: Option, - ) -> Result, String> { + ) -> Result { let mut rows = self.find_table_rows(case, condition, select, index)?; match rows.pop() { @@ -253,7 +253,7 @@ impl Table for Geoip { condition: &'a [Condition<'a>], select: Option<&[String]>, _: Option, - ) -> Result>, String> { + ) -> Result, String> { match condition.get(0) { Some(_) if condition.len() > 1 => Err("Only one condition is allowed".to_string()), Some(Condition::Equals { value, .. }) => { @@ -315,7 +315,7 @@ mod tests { fn city_lookup() { let values = find("2.125.160.216", "tests/data/GeoIP2-City-Test.mmdb").unwrap(); - let mut expected = BTreeMap::::new(); + let mut expected = ObjectMap::new(); expected.insert("city_name".to_string(), "Boxford".into()); expected.insert("country_code".to_string(), "GB".into()); expected.insert("continent_code".to_string(), "EU".into()); @@ -340,7 +340,7 @@ mod tests { ) .unwrap(); - let mut expected = BTreeMap::::new(); + let mut expected = ObjectMap::new(); expected.insert("latitude".to_string(), Value::from(51.75)); expected.insert("longitude".to_string(), Value::from(-1.25)); @@ -351,7 +351,7 @@ mod tests { fn city_lookup_partial_results() { let values = find("67.43.156.9", "tests/data/GeoIP2-City-Test.mmdb").unwrap(); - let mut expected = BTreeMap::::new(); + let mut expected = ObjectMap::new(); expected.insert("city_name".to_string(), Value::Null); expected.insert("country_code".to_string(), "BT".into()); expected.insert("country_name".to_string(), "Bhutan".into()); @@ -378,7 +378,7 @@ mod tests { fn isp_lookup() { let values = find("208.192.1.2", "tests/data/GeoIP2-ISP-Test.mmdb").unwrap(); - let mut expected = BTreeMap::::new(); + let mut expected = ObjectMap::new(); expected.insert("autonomous_system_number".to_string(), 701i64.into()); expected.insert( "autonomous_system_organization".to_string(), @@ -394,7 +394,7 @@ mod tests { fn isp_lookup_partial_results() { let values = find("2600:7000::1", "tests/data/GeoLite2-ASN-Test.mmdb").unwrap(); - let mut expected = BTreeMap::::new(); + let mut expected = ObjectMap::new(); expected.insert("autonomous_system_number".to_string(), 6939i64.into()); expected.insert( "autonomous_system_organization".to_string(), @@ -421,7 +421,7 @@ mod tests { ) .unwrap(); - let mut expected = BTreeMap::::new(); + let mut expected = ObjectMap::new(); expected.insert("connection_type".to_string(), "Corporate".into()); assert_eq!(values, expected); @@ -434,15 +434,11 @@ mod tests { assert!(values.is_none()); } - fn find(ip: &str, database: &str) -> Option> { + fn find(ip: &str, database: &str) -> Option { find_select(ip, database, None) } - fn find_select( - ip: &str, - database: &str, - select: Option<&[String]>, - ) -> Option> { + fn find_select(ip: &str, database: &str, select: Option<&[String]>) -> Option { Geoip::new(GeoipConfig { path: database.to_string(), locale: default_locale(), diff --git a/src/sinks/datadog/traces/apm_stats/aggregation.rs b/src/sinks/datadog/traces/apm_stats/aggregation.rs index 7d990b6c803e4..e1897df613233 100644 --- a/src/sinks/datadog/traces/apm_stats/aggregation.rs +++ b/src/sinks/datadog/traces/apm_stats/aggregation.rs @@ -7,7 +7,7 @@ use super::{ bucket::Bucket, ClientStatsBucket, ClientStatsPayload, PartitionKey, BUCKET_DURATION_NANOSECONDS, }; -use crate::event::{TraceEvent, Value}; +use crate::event::{ObjectMap, TraceEvent, Value}; const MEASURED_KEY: &str = "_dd.measured"; const PARTIAL_VERSION_KEY: &str = "_dd.partial_version"; @@ -26,7 +26,7 @@ pub(crate) struct AggregationKey { impl AggregationKey { fn new_aggregation_from_span( - span: &BTreeMap, + span: &ObjectMap, payload_key: PayloadAggregationKey, synthetics: bool, ) -> Self { @@ -71,7 +71,7 @@ pub(crate) struct PayloadAggregationKey { } impl PayloadAggregationKey { - fn with_span_context(self, span: &BTreeMap) -> Self { + fn with_span_context(self, span: &ObjectMap) -> Self { PayloadAggregationKey { env: span .get("meta") @@ -221,7 +221,7 @@ impl Aggregator { /// The key is constructed from various span/trace properties (see `AggregationKey`). fn handle_span( &mut self, - span: &BTreeMap, + span: &ObjectMap, weight: f64, is_top: bool, synthetics: bool, @@ -387,7 +387,7 @@ const fn align_timestamp(start: u64) -> u64 { /// Assumes that all metrics are all encoded as Value::Float. /// Return the f64 of the specified key or None of key not present. -fn get_metric_value_float(span: &BTreeMap, key: &str) -> Option { +fn get_metric_value_float(span: &ObjectMap, key: &str) -> Option { span.get("metrics") .and_then(|m| m.as_object()) .map(|m| match m.get(key) { @@ -399,7 +399,7 @@ fn get_metric_value_float(span: &BTreeMap, key: &str) -> Option, key: &str) -> bool { +fn metric_value_is_1(span: &ObjectMap, key: &str) -> bool { match get_metric_value_float(span, key) { Some(f) => f == 1.0, None => false, @@ -407,14 +407,14 @@ fn metric_value_is_1(span: &BTreeMap, key: &str) -> bool { } /// Returns true if span is top-level. -fn has_top_level(span: &BTreeMap) -> bool { +fn has_top_level(span: &ObjectMap) -> bool { // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L28-L31 metric_value_is_1(span, TOP_LEVEL_KEY) } /// Returns true if a span should be measured (i.e. it should get trace metrics calculated). -fn is_measured(span: &BTreeMap) -> bool { +fn is_measured(span: &ObjectMap) -> bool { // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L40-L43 metric_value_is_1(span, MEASURED_KEY) @@ -424,7 +424,7 @@ fn is_measured(span: &BTreeMap) -> bool { /// These types of spans are partial images of long-running spans. /// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive integer. /// The metric usually increases each time a new version of the same span is sent by the tracer -fn is_partial_snapshot(span: &BTreeMap) -> bool { +fn is_partial_snapshot(span: &ObjectMap) -> bool { // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L49-L52 match get_metric_value_float(span, PARTIAL_VERSION_KEY) { diff --git a/src/sinks/datadog/traces/apm_stats/bucket.rs b/src/sinks/datadog/traces/apm_stats/bucket.rs index a1d0a9f198508..7dd2334e7fe27 100644 --- a/src/sinks/datadog/traces/apm_stats/bucket.rs +++ b/src/sinks/datadog/traces/apm_stats/bucket.rs @@ -6,7 +6,7 @@ use super::{ aggregation::{AggregationKey, PayloadAggregationKey}, ddsketch_full, ClientGroupedStats, ClientStatsBucket, }; -use crate::{event::Value, metrics::AgentDDSketch}; +use crate::{event::ObjectMap, event::Value, metrics::AgentDDSketch}; pub(crate) struct GroupedStats { hits: f64, @@ -142,7 +142,7 @@ impl Bucket { pub(crate) fn add( &mut self, - span: &BTreeMap, + span: &ObjectMap, weight: f64, is_top: bool, aggkey: AggregationKey, @@ -159,7 +159,7 @@ impl Bucket { /// Update a bucket with a new span. Computed statistics include the number of hits and the actual distribution of /// execution time, with isolated measurements for spans flagged as errored and spans without error. - fn update(span: &BTreeMap, weight: f64, is_top: bool, gs: &mut GroupedStats) { + fn update(span: &ObjectMap, weight: f64, is_top: bool, gs: &mut GroupedStats) { is_top.then(|| { gs.top_level_hits += weight; }); diff --git a/src/sinks/datadog/traces/apm_stats/weight.rs b/src/sinks/datadog/traces/apm_stats/weight.rs index dfce6fbdb9f57..48d6195e06606 100644 --- a/src/sinks/datadog/traces/apm_stats/weight.rs +++ b/src/sinks/datadog/traces/apm_stats/weight.rs @@ -1,11 +1,11 @@ use std::collections::BTreeMap; -use crate::event::Value; +use crate::event::{ObjectMap, Value}; const SAMPLING_RATE_KEY: &str = "_sample_rate"; /// This extracts the relative weights from the top level span (i.e. the span that does not have a parent). -pub(crate) fn extract_weight_from_root_span(spans: &[&BTreeMap]) -> f64 { +pub(crate) fn extract_weight_from_root_span(spans: &[&ObjectMap]) -> f64 { // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/weight.go#L17-L26. // TODO this logic likely has a bug(s) that need to be root caused. The root span is not reliably found and defaults to "1.0" diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs index 01ec1742a381e..900aa5a7b448c 100644 --- a/src/sinks/datadog/traces/request_builder.rs +++ b/src/sinks/datadog/traces/request_builder.rs @@ -20,7 +20,7 @@ use super::{ sink::PartitionKey, }; use crate::{ - event::{Event, TraceEvent, Value}, + event::{Event, KeyString, ObjectMap, TraceEvent, Value}, sinks::util::{ metadata::RequestMetadataBuilder, Compression, Compressor, IncrementalRequestBuilder, }, @@ -289,7 +289,7 @@ impl DatadogTracesEncoder { .map(|m| { m.iter() .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned())) - .collect::>() + .collect::>() }) .unwrap_or_default(); @@ -319,7 +319,7 @@ impl DatadogTracesEncoder { .and_then(|v| v.as_boolean()) .unwrap_or(false), spans, - tags: tags.clone(), + tags: to_string_map(&tags), }; dd_proto::TracerPayload { @@ -344,7 +344,7 @@ impl DatadogTracesEncoder { .map(|v| v.to_string_lossy().into_owned()) .unwrap_or_default(), chunks: vec![chunk], - tags, + tags: into_string_map(tags), env: trace .get(event_path!("env")) .map(|v| v.to_string_lossy().into_owned()) @@ -360,7 +360,7 @@ impl DatadogTracesEncoder { } } - fn convert_span(span: &BTreeMap) -> dd_proto::Span { + fn convert_span(span: &ObjectMap) -> dd_proto::Span { let trace_id = match span.get("trace_id") { Some(Value::Integer(val)) => *val, _ => 0, @@ -394,7 +394,7 @@ impl DatadogTracesEncoder { .map(|m| { m.iter() .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned())) - .collect::>() + .collect::>() }) .unwrap_or_default(); @@ -404,7 +404,7 @@ impl DatadogTracesEncoder { .map(|m| { m.iter() .map(|(k, v)| (k.clone(), v.coerce_to_bytes().into_iter().collect())) - .collect::>>() + .collect::>>() }) .unwrap_or_default(); @@ -420,7 +420,7 @@ impl DatadogTracesEncoder { None } }) - .collect::>() + .collect::>() }) .unwrap_or_default(); @@ -447,9 +447,19 @@ impl DatadogTracesEncoder { error: error as i32, start, duration, - meta, - metrics, - meta_struct, + meta: into_string_map(meta), + metrics: into_string_map(metrics), + meta_struct: into_string_map(meta_struct), } } } + +fn to_string_map(map: &BTreeMap) -> BTreeMap { + map.iter() + .map(|(k, v)| (k.to_string(), v.clone())) + .collect() +} + +fn into_string_map(map: BTreeMap) -> BTreeMap { + map.into_iter().map(|(k, v)| (k.into(), v)).collect() +} diff --git a/src/sinks/datadog/traces/tests.rs b/src/sinks/datadog/traces/tests.rs index af9723c236970..8c78bf9cf09f6 100644 --- a/src/sinks/datadog/traces/tests.rs +++ b/src/sinks/datadog/traces/tests.rs @@ -56,8 +56,8 @@ async fn start_test( .await } -fn simple_span(resource: String) -> BTreeMap { - BTreeMap::::from([ +fn simple_span(resource: String) -> ObjectMap { + ObjectMap::from([ ("service".to_string(), Value::from("a_service")), ("name".to_string(), Value::from("a_name")), ("resource".to_string(), Value::from(resource)), @@ -73,14 +73,14 @@ fn simple_span(resource: String) -> BTreeMap { ("error".to_string(), Value::Integer(404)), ( "meta".to_string(), - Value::Object(BTreeMap::::from([ + Value::Object(ObjectMap::from([ ("foo".to_string(), Value::from("bar")), ("bar".to_string(), Value::from("baz")), ])), ), ( "metrics".to_string(), - Value::Object(BTreeMap::::from([ + Value::Object(ObjectMap::from([ ( "a_metric".to_string(), Value::Float(NotNan::new(0.577).unwrap()), diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index f8a8a55ac7882..eb78977031b6e 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -68,8 +68,8 @@ fn data_stream_body( dtype: Option, dataset: Option, namespace: Option, -) -> BTreeMap { - let mut ds = BTreeMap::::new(); +) -> ObjectMap { + let mut ds = ObjectMap::new(); if let Some(dtype) = dtype { ds.insert("type".into(), Value::from(dtype)); diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index f84b75daef9a6..7038c5ff6d9a5 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -14,17 +14,17 @@ use vector_config::configurable_component; use vector_core::config::log_schema; use vector_core::schema; +use super::{ + encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field, + InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion, +}; use crate::{ codecs::Transformer, config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, - event::{Event, MetricTags, Value}, + event::{Event, KeyString, MetricTags, Value}, http::HttpClient, internal_events::InfluxdbEncodingError, sinks::{ - influxdb::{ - encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field, - InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion, - }, util::{ http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, BatchConfig, Buffer, Compression, SinkBatchSettings, TowerRequestConfig, @@ -75,7 +75,7 @@ pub struct InfluxDbLogsConfig { #[serde(default)] #[configurable(metadata(docs::examples = "field1"))] #[configurable(metadata(docs::examples = "parent.child_field"))] - pub tags: Vec, + pub tags: Vec, #[serde(flatten)] pub influxdb1_settings: Option, @@ -138,7 +138,7 @@ struct InfluxDbLogsSink { token: String, protocol_version: ProtocolVersion, measurement: String, - tags: HashSet, + tags: HashSet, transformer: Transformer, host_key: OwnedValuePath, message_key: OwnedValuePath, @@ -164,7 +164,7 @@ impl GenerateConfig for InfluxDbLogsConfig { impl SinkConfig for InfluxDbLogsConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let measurement = self.get_measurement()?; - let tags: HashSet = self.tags.clone().into_iter().collect(); + let tags: HashSet = self.tags.iter().cloned().collect(); let tls_settings = TlsSettings::from_options(&self.tls)?; let client = HttpClient::new(tls_settings, cx.proxy())?; @@ -251,7 +251,7 @@ impl SinkConfig for InfluxDbLogsConfig { struct InfluxDbLogsEncoder { protocol_version: ProtocolVersion, measurement: String, - tags: HashSet, + tags: HashSet, transformer: Transformer, host_key: OwnedValuePath, message_key: OwnedValuePath, @@ -271,16 +271,16 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { // Add the `host` and `source_type` to the HashSet of tags to include // Ensure those paths are on the event to be encoded, rather than metadata if let Some(host_path) = log.host_path().cloned().as_ref() { - self.tags.replace(host_path.path.to_string()); + self.tags.replace(host_path.path.to_string().into()); log.rename_key(host_path, (PathPrefix::Event, &self.host_key)); } if let Some(source_type_path) = log.source_type_path().cloned().as_ref() { - self.tags.replace(source_type_path.path.to_string()); + self.tags.replace(source_type_path.path.to_string().into()); log.rename_key(source_type_path, (PathPrefix::Event, &self.source_type_key)); } - self.tags.replace("metric_type".to_string()); + self.tags.replace("metric_type".into()); log.insert(event_path!("metric_type"), "logs"); // Timestamp @@ -297,10 +297,10 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { // Tags + Fields let mut tags = MetricTags::default(); - let mut fields: HashMap = HashMap::new(); + let mut fields: HashMap = HashMap::new(); log.convert_to_fields().for_each(|(key, value)| { - if self.tags.contains(&key) { - tags.replace(key, value.to_string_lossy().into_owned()); + if self.tags.contains(&key[..]) { + tags.replace(key.into(), value.to_string_lossy().into_owned()); } else { fields.insert(key, to_field(value)); } diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index 31faab4ffe644..2dade43606f54 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -14,7 +14,7 @@ use crate::{ config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext}, event::{ metric::{Metric, MetricValue, Sample, StatisticKind}, - Event, + Event, KeyString, }, http::HttpClient, internal_events::InfluxdbEncodingError, @@ -319,7 +319,7 @@ fn encode_events( fn get_type_and_fields( value: &MetricValue, quantiles: &[f64], -) -> (&'static str, Option>) { +) -> (&'static str, Option>) { match value { MetricValue::Counter { value } => ("counter", Some(to_fields(*value))), MetricValue::Gauge { value } => ("gauge", Some(to_fields(*value))), @@ -329,17 +329,17 @@ fn get_type_and_fields( count, sum, } => { - let mut fields: HashMap = buckets + let mut fields: HashMap = buckets .iter() .map(|sample| { ( - format!("bucket_{}", sample.upper_limit), + format!("bucket_{}", sample.upper_limit).into(), Field::UnsignedInt(sample.count), ) }) .collect(); - fields.insert("count".to_owned(), Field::UnsignedInt(*count)); - fields.insert("sum".to_owned(), Field::Float(*sum)); + fields.insert("count".into(), Field::UnsignedInt(*count)); + fields.insert("sum".into(), Field::Float(*sum)); ("histogram", Some(fields)) } @@ -348,17 +348,17 @@ fn get_type_and_fields( count, sum, } => { - let mut fields: HashMap = quantiles + let mut fields: HashMap = quantiles .iter() .map(|quantile| { ( - format!("quantile_{}", quantile.quantile), + format!("quantile_{}", quantile.quantile).into(), Field::Float(quantile.value), ) }) .collect(); - fields.insert("count".to_owned(), Field::UnsignedInt(*count)); - fields.insert("sum".to_owned(), Field::Float(*sum)); + fields.insert("count".into(), Field::UnsignedInt(*count)); + fields.insert("sum".into(), Field::Float(*sum)); ("summary", Some(fields)) } @@ -382,31 +382,25 @@ fn get_type_and_fields( value: ddsketch.quantile(*q).unwrap_or(0.0), }; ( - quantile.to_percentile_string(), + quantile.to_percentile_string().into(), Field::Float(quantile.value), ) }) - .collect::>(); + .collect::>(); fields.insert( - "count".to_owned(), + "count".into(), Field::UnsignedInt(u64::from(ddsketch.count())), ); fields.insert( - "min".to_owned(), + "min".into(), Field::Float(ddsketch.min().unwrap_or(f64::MAX)), ); fields.insert( - "max".to_owned(), + "max".into(), Field::Float(ddsketch.max().unwrap_or(f64::MIN)), ); - fields.insert( - "sum".to_owned(), - Field::Float(ddsketch.sum().unwrap_or(0.0)), - ); - fields.insert( - "avg".to_owned(), - Field::Float(ddsketch.avg().unwrap_or(0.0)), - ); + fields.insert("sum".into(), Field::Float(ddsketch.sum().unwrap_or(0.0))); + fields.insert("avg".into(), Field::Float(ddsketch.avg().unwrap_or(0.0))); ("sketch", Some(fields)) } @@ -414,34 +408,33 @@ fn get_type_and_fields( } } -fn encode_distribution(samples: &[Sample], quantiles: &[f64]) -> Option> { +fn encode_distribution(samples: &[Sample], quantiles: &[f64]) -> Option> { let statistic = DistributionStatistic::from_samples(samples, quantiles)?; - let fields: HashMap = vec![ - ("min".to_owned(), Field::Float(statistic.min)), - ("max".to_owned(), Field::Float(statistic.max)), - ("median".to_owned(), Field::Float(statistic.median)), - ("avg".to_owned(), Field::Float(statistic.avg)), - ("sum".to_owned(), Field::Float(statistic.sum)), - ("count".to_owned(), Field::Float(statistic.count as f64)), - ] - .into_iter() - .chain( - statistic - .quantiles - .iter() - .map(|&(p, val)| (format!("quantile_{:.2}", p), Field::Float(val))), + Some( + [ + ("min".into(), Field::Float(statistic.min)), + ("max".into(), Field::Float(statistic.max)), + ("median".into(), Field::Float(statistic.median)), + ("avg".into(), Field::Float(statistic.avg)), + ("sum".into(), Field::Float(statistic.sum)), + ("count".into(), Field::Float(statistic.count as f64)), + ] + .into_iter() + .chain( + statistic + .quantiles + .iter() + .map(|&(p, val)| (format!("quantile_{:.2}", p).into(), Field::Float(val))), + ) + .collect(), ) - .collect(); - - Some(fields) } -fn to_fields(value: f64) -> HashMap { - let fields: HashMap = vec![("value".to_owned(), Field::Float(value))] +fn to_fields(value: f64) -> HashMap { + [("value".into(), Field::Float(value))] .into_iter() - .collect(); - fields + .collect() } #[cfg(test)] diff --git a/src/sinks/influxdb/mod.rs b/src/sinks/influxdb/mod.rs index a2b322c74ddf8..28cfb7c347845 100644 --- a/src/sinks/influxdb/mod.rs +++ b/src/sinks/influxdb/mod.rs @@ -11,7 +11,7 @@ use snafu::{ResultExt, Snafu}; use tower::Service; use vector_common::sensitive_string::SensitiveString; use vector_config::configurable_component; -use vector_core::event::MetricTags; +use vector_core::event::{KeyString, MetricTags}; use crate::http::HttpClient; @@ -232,7 +232,7 @@ pub(in crate::sinks) fn influx_line_protocol( protocol_version: ProtocolVersion, measurement: &str, tags: Option, - fields: Option>, + fields: Option>, timestamp: i64, line_protocol: &mut BytesMut, ) -> Result<(), &'static str> { @@ -284,7 +284,7 @@ fn encode_tags(tags: MetricTags, output: &mut BytesMut) { fn encode_fields( protocol_version: ProtocolVersion, - fields: HashMap, + fields: HashMap, output: &mut BytesMut, ) { let original_len = output.len(); diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index 9d9046788d04b..6865e88299877 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -654,7 +654,7 @@ mod tests { } } "#; - let msg: BTreeMap = serde_json::from_str(message)?; + let msg: ObjectMap = serde_json::from_str(message)?; let event = Event::Log(LogEvent::from(msg)); let record = encoder.encode_event(event).unwrap(); @@ -699,7 +699,7 @@ mod tests { } } "#; - let msg: BTreeMap = serde_json::from_str(message)?; + let msg: ObjectMap = serde_json::from_str(message)?; let event = Event::Log(LogEvent::from(msg)); let record = encoder.encode_event(event).unwrap(); @@ -727,7 +727,7 @@ mod tests { remove_timestamp: false, }; - let msg: BTreeMap = serde_json::from_str("{}")?; + let msg: ObjectMap = serde_json::from_str("{}")?; let event = Event::Log(LogEvent::from(msg)); let record = encoder.encode_event(event).unwrap(); diff --git a/src/sinks/new_relic/model.rs b/src/sinks/new_relic/model.rs index 5a8b2fc800897..86c2e67a4079e 100644 --- a/src/sinks/new_relic/model.rs +++ b/src/sinks/new_relic/model.rs @@ -12,7 +12,7 @@ use vector_common::internal_event::{ComponentEventsDropped, INTENTIONAL, UNINTEN use vrl::event_path; use super::NewRelicSinkError; -use crate::event::{Event, MetricKind, MetricValue, Value}; +use crate::event::{Event, KeyString, MetricKind, MetricValue, Value}; #[derive(Debug)] pub enum NewRelicApiModel { @@ -21,8 +21,8 @@ pub enum NewRelicApiModel { Logs(LogsApiModel), } -type KeyValData = HashMap; -type DataStore = HashMap>; +type KeyValData = HashMap; +type DataStore = HashMap>; #[derive(Serialize, Deserialize, Debug)] pub struct MetricsApiModel(pub Vec); @@ -30,7 +30,7 @@ pub struct MetricsApiModel(pub Vec); impl MetricsApiModel { pub fn new(metric_array: Vec) -> Self { let mut metric_store = DataStore::new(); - metric_store.insert("metrics".to_owned(), metric_array); + metric_store.insert("metrics".into(), metric_array); Self(vec![metric_store]) } } @@ -66,10 +66,8 @@ impl TryFrom> for MetricsApiModel { num_missing_interval += 1; return None; }; - metric_data.insert( - "interval.ms".to_owned(), - Value::from(interval_ms.get() as i64), - ); + metric_data + .insert("interval.ms".into(), Value::from(interval_ms.get() as i64)); (value, "count") } (MetricValue::Counter { value }, MetricKind::Absolute) => (value, "gauge"), @@ -82,15 +80,15 @@ impl TryFrom> for MetricsApiModel { }; // Set name, type, value, timestamp, and attributes - metric_data.insert("name".to_owned(), Value::from(series.name.name)); - metric_data.insert("type".to_owned(), Value::from(metric_type)); + metric_data.insert("name".into(), Value::from(series.name.name)); + metric_data.insert("type".into(), Value::from(metric_type)); let Some(value) = NotNan::new(value).ok() else { num_nan_value += 1; return None; }; - metric_data.insert("value".to_owned(), Value::from(value)); + metric_data.insert("value".into(), Value::from(value)); metric_data.insert( - "timestamp".to_owned(), + "timestamp".into(), Value::from( data.time .timestamp @@ -100,10 +98,10 @@ impl TryFrom> for MetricsApiModel { ); if let Some(tags) = series.tags { metric_data.insert( - "attributes".to_owned(), + "attributes".into(), Value::from( tags.iter_single() - .map(|(key, value)| (key.to_string(), Value::from(value))) + .map(|(key, value)| (key.into(), Value::from(value))) .collect::>(), ), ); @@ -162,7 +160,7 @@ impl TryFrom> for EventsApiModel { let mut num_non_log_events = 0; let mut num_nan_value = 0; - let events_array: Vec> = buf_events + let events_array: Vec> = buf_events .into_iter() .filter_map(|event| { let Some(log) = event.try_into_log() else { @@ -184,23 +182,23 @@ impl TryFrom> for EventsApiModel { for (k, v) in json_map { match v { serde_json::Value::String(s) => { - event_model.insert(k, Value::from(s)); + event_model.insert(k.into(), Value::from(s)); } serde_json::Value::Number(n) => { if let Some(f) = n.as_f64() { event_model.insert( - k, + k.into(), Value::from(NotNan::new(f).ok().or_else(|| { num_nan_value += 1; None })?), ); } else { - event_model.insert(k, Value::from(n.as_i64())); + event_model.insert(k.into(), Value::from(n.as_i64())); } } serde_json::Value::Bool(b) => { - event_model.insert(k, Value::from(b)); + event_model.insert(k.into(), Value::from(b)); } _ => { // Note that arrays and nested objects are silently dropped. @@ -212,8 +210,7 @@ impl TryFrom> for EventsApiModel { } if event_model.get("eventType").is_none() { - event_model - .insert("eventType".to_owned(), Value::from("VectorSink".to_owned())); + event_model.insert("eventType".into(), Value::from("VectorSink".to_owned())); } Some(event_model) @@ -247,7 +244,7 @@ pub struct LogsApiModel(pub Vec); impl LogsApiModel { pub fn new(logs_array: Vec) -> Self { let mut logs_store = DataStore::new(); - logs_store.insert("logs".to_owned(), logs_array); + logs_store.insert("logs".into(), logs_array); Self(vec![logs_store]) } } @@ -258,7 +255,7 @@ impl TryFrom> for LogsApiModel { fn try_from(buf_events: Vec) -> Result { let mut num_non_log_events = 0; - let logs_array: Vec> = buf_events + let logs_array: Vec> = buf_events .into_iter() .filter_map(|event| { let Some(log) = event.try_into_log() else { @@ -271,10 +268,7 @@ impl TryFrom> for LogsApiModel { log_model.insert(k, v.clone()); } if log.get(event_path!("message")).is_none() { - log_model.insert( - "message".to_owned(), - Value::from("log from vector".to_owned()), - ); + log_model.insert("message".into(), Value::from("log from vector".to_owned())); } Some(log_model) diff --git a/src/sinks/pulsar/request_builder.rs b/src/sinks/pulsar/request_builder.rs index de62f179dcb30..962fbeb6eb7d6 100644 --- a/src/sinks/pulsar/request_builder.rs +++ b/src/sinks/pulsar/request_builder.rs @@ -2,6 +2,7 @@ use bytes::Bytes; use std::collections::HashMap; use std::io; +use crate::event::KeyString; use crate::sinks::{ prelude::*, pulsar::{encoder::PulsarEncoder, service::PulsarRequest, sink::PulsarEvent}, @@ -11,7 +12,7 @@ use crate::sinks::{ pub(super) struct PulsarMetadata { pub finalizers: EventFinalizers, pub key: Option, - pub properties: Option>, + pub properties: Option>, pub timestamp_millis: Option, pub topic: String, } diff --git a/src/sinks/pulsar/service.rs b/src/sinks/pulsar/service.rs index 8afabb3260207..a7afd635ef2d8 100644 --- a/src/sinks/pulsar/service.rs +++ b/src/sinks/pulsar/service.rs @@ -106,7 +106,7 @@ impl Service for PulsarService { let mut properties = HashMap::new(); if let Some(props) = request.metadata.properties { for (key, value) in props { - properties.insert(key, String::from_utf8_lossy(&value).to_string()); + properties.insert(key.into(), String::from_utf8_lossy(&value).to_string()); } } diff --git a/src/sinks/pulsar/sink.rs b/src/sinks/pulsar/sink.rs index 7bd26ac24f5a8..479d4455b97e0 100644 --- a/src/sinks/pulsar/sink.rs +++ b/src/sinks/pulsar/sink.rs @@ -4,13 +4,13 @@ use pulsar::{Error as PulsarError, Pulsar, TokioExecutor}; use serde::Serialize; use snafu::Snafu; use std::collections::HashMap; - -use crate::sinks::prelude::*; +use vrl::value::KeyString; use super::{ config::PulsarSinkConfig, encoder::PulsarEncoder, request_builder::PulsarRequestBuilder, service::PulsarService, util, }; +use crate::sinks::prelude::*; #[derive(Debug, Snafu)] #[snafu(visibility(pub(crate)))] @@ -37,7 +37,7 @@ pub(super) struct PulsarEvent { pub(super) event: Event, pub(super) topic: String, pub(super) key: Option, - pub(super) properties: Option>, + pub(super) properties: Option>, pub(super) timestamp_millis: Option, } @@ -56,7 +56,7 @@ impl ByteSizeOf for PulsarEvent { + self.properties.as_ref().map_or(0, |props| { props .iter() - .map(|(key, val)| key.capacity() + val.size_of()) + .map(|(key, val)| key.allocated_bytes() + val.size_of()) .sum() }) } diff --git a/src/sinks/pulsar/util.rs b/src/sinks/pulsar/util.rs index b14500252c8d4..1f0d45fe4e0d2 100644 --- a/src/sinks/pulsar/util.rs +++ b/src/sinks/pulsar/util.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use lookup::lookup_v2::OptionalTargetPath; use std::collections::HashMap; use vector_core::event::Event; -use vrl::value::Value; +use vrl::value::{KeyString, Value}; /// Transforms an event into a Pulsar event by rendering the required template fields. /// Returns None if there is an error whilst rendering. @@ -57,7 +57,7 @@ fn get_timestamp_millis(event: &Event) -> Option { pub(super) fn get_properties( event: &Event, properties_key: &Option, -) -> Option> { +) -> Option> { properties_key.as_ref().and_then(|properties_key| { properties_key.path.as_ref().and_then(|path| { event.maybe_as_log().and_then(|log| { diff --git a/src/sinks/sematext/metrics.rs b/src/sinks/sematext/metrics.rs index 3e09630d556f6..8f5e8f93d1c4d 100644 --- a/src/sinks/sematext/metrics.rs +++ b/src/sinks/sematext/metrics.rs @@ -15,7 +15,7 @@ use crate::{ config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, event::{ metric::{Metric, MetricValue}, - Event, + Event, KeyString, }, http::HttpClient, internal_events::{SematextMetricsEncodeEventError, SematextMetricsInvalidMetricError}, @@ -299,9 +299,9 @@ fn encode_events( EncodedEvent::new(output.freeze(), byte_size, json_byte_size) } -fn to_fields(label: String, value: f64) -> HashMap { +fn to_fields(label: String, value: f64) -> HashMap { let mut result = HashMap::new(); - result.insert(label, Field::Float(value)); + result.insert(label.into(), Field::Float(value)); result } diff --git a/src/sources/datadog_agent/traces.rs b/src/sources/datadog_agent/traces.rs index bd9099c13a2ab..f0e2665f78c60 100644 --- a/src/sources/datadog_agent/traces.rs +++ b/src/sources/datadog_agent/traces.rs @@ -12,7 +12,7 @@ use vrl::event_path; use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter, Rejection, Reply}; use crate::{ - event::{Event, TraceEvent, Value}, + event::{Event, ObjectMap, TraceEvent, Value}, sources::{ datadog_agent::{ddtrace_proto, handle_request, ApiKeyQueryParams, DatadogAgentSource}, util::ErrorMessage, @@ -275,8 +275,8 @@ fn handle_dd_trace_payload_v0( Ok(enriched_events) } -fn convert_span(dd_span: ddtrace_proto::Span) -> BTreeMap { - let mut span = BTreeMap::::new(); +fn convert_span(dd_span: ddtrace_proto::Span) -> ObjectMap { + let mut span = ObjectMap::new(); span.insert("service".into(), Value::from(dd_span.service)); span.insert("name".into(), Value::from(dd_span.name)); @@ -301,8 +301,13 @@ fn convert_span(dd_span: ddtrace_proto::Span) -> BTreeMap { dd_span .metrics .into_iter() - .map(|(k, v)| (k, NotNan::new(v).map(Value::Float).unwrap_or(Value::Null))) - .collect::>(), + .map(|(k, v)| { + ( + k.into(), + NotNan::new(v).map(Value::Float).unwrap_or(Value::Null), + ) + }) + .collect::(), ), ); span.insert("type".into(), Value::from(dd_span.r#type)); @@ -312,17 +317,17 @@ fn convert_span(dd_span: ddtrace_proto::Span) -> BTreeMap { dd_span .meta_struct .into_iter() - .map(|(k, v)| (k, Value::from(bytes::Bytes::from(v)))) - .collect::>(), + .map(|(k, v)| (k.into(), Value::from(bytes::Bytes::from(v)))) + .collect::(), ), ); span } -fn convert_tags(original_map: BTreeMap) -> BTreeMap { +fn convert_tags(original_map: BTreeMap) -> ObjectMap { original_map .into_iter() - .map(|(k, v)| (k, Value::from(v))) - .collect::>() + .map(|(k, v)| (k.into(), Value::from(v))) + .collect::() } diff --git a/src/sources/fluent/message.rs b/src/sources/fluent/message.rs index d3746839dd8b6..45c25b7d93b75 100644 --- a/src/sources/fluent/message.rs +++ b/src/sources/fluent/message.rs @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, convert::TryInto}; use chrono::{serde::ts_seconds, DateTime, TimeZone, Utc}; use ordered_float::NotNan; use serde::{Deserialize, Serialize}; -use vector_core::event::Value; +use vector_core::event::{KeyString, ObjectMap, Value}; /// Fluent msgpack messages can be encoded in one of three ways, each with and /// without options, all using arrays to encode the top-level fields. @@ -184,18 +184,18 @@ impl From for Value { .into_iter() .filter_map(|(key, value)| { key.as_str() - .map(|k| (k.to_owned(), Value::from(FluentValue(value)))) + .map(|k| (k.into(), Value::from(FluentValue(value)))) }) .collect(), ) } rmpv::Value::Ext(code, bytes) => { - let mut fields = BTreeMap::new(); + let mut fields = ObjectMap::new(); fields.insert( - String::from("msgpack_extension_code"), + KeyString::from("msgpack_extension_code"), Value::Integer(code.into()), ); - fields.insert(String::from("bytes"), Value::Bytes(bytes.into())); + fields.insert(KeyString::from("bytes"), Value::Bytes(bytes.into())); Value::Object(fields) } } diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index 3f1e3b5f1f529..8bf3496771ecb 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -664,7 +664,7 @@ impl PubsubSource { message .attributes .into_iter() - .map(|(key, value)| (key, Value::Bytes(value.into()))) + .map(|(key, value)| (key.into(), Value::Bytes(value.into()))) .collect(), ); let log_namespace = self.log_namespace; diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 91eb05c5ed399..5503bf7ddc4d3 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, io::Cursor, sync::{Arc, OnceLock}, time::Duration, @@ -29,7 +29,7 @@ use vector_core::{ config::{LegacyKey, LogNamespace}, EstimatedJsonEncodedSizeOf, }; -use vrl::value::{kind::Collection, Kind}; +use vrl::value::{kind::Collection, Kind, ObjectMap}; use crate::{ codecs::{Decoder, DecodingConfig}, @@ -543,7 +543,7 @@ impl<'a> Keys<'a> { struct ReceivedMessage { timestamp: Option>, key: Value, - headers: BTreeMap, + headers: ObjectMap, topic: String, partition: i32, offset: i64, @@ -562,12 +562,12 @@ impl ReceivedMessage { .map(|key| Value::from(Bytes::from(key.to_owned()))) .unwrap_or(Value::Null); - let mut headers_map = BTreeMap::new(); + let mut headers_map = ObjectMap::new(); if let Some(headers) = msg.headers() { for header in headers.iter() { if let Some(value) = header.value { headers_map.insert( - header.key.to_string(), + header.key.into(), Value::from(Bytes::from(value.to_owned())), ); } @@ -898,7 +898,7 @@ mod test { #[cfg(feature = "kafka-integration-tests")] #[cfg(test)] mod integration_test { - use std::time::Duration; + use std::{collections::BTreeMap, time::Duration}; use chrono::{DateTime, SubsecRound, Utc}; use futures::Stream; diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index ec3a038bd9739..5f2b8d1e215de 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -19,7 +19,7 @@ use vector_core::{ schema::Definition, }; use vrl::value::kind::Collection; -use vrl::value::Kind; +use vrl::value::{KeyString, Kind}; use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker}; use crate::{ @@ -434,7 +434,7 @@ impl TryFrom for LogstashFrameType { struct LogstashEventFrame { protocol: LogstashProtocolVersion, sequence_number: u32, - fields: BTreeMap, + fields: BTreeMap, } // Based on spec at: https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md @@ -522,7 +522,7 @@ impl Decoder for LogstashDecoder { let sequence_number = rest.get_u32(); let pair_count = rest.get_u32(); - let mut fields: BTreeMap = BTreeMap::new(); + let mut fields = BTreeMap::::new(); for _ in 0..pair_count { if src.remaining() < 4 { return Ok(None); @@ -546,7 +546,7 @@ impl Decoder for LogstashDecoder { rest = right; fields.insert( - String::from_utf8_lossy(key).to_string(), + String::from_utf8_lossy(key).into(), String::from_utf8_lossy(value).into(), ); } @@ -585,7 +585,7 @@ impl Decoder for LogstashDecoder { let (slice, right) = rest.split_at(payload_size); rest = right; - let fields_result: Result, _> = + let fields_result: Result, _> = serde_json::from_slice(slice).context(JsonFrameFailedDecodeSnafu {}); let remaining = rest.remaining(); diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index fc538efad199d..cff3d8b949482 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -300,7 +300,7 @@ fn str_into_hex_bytes(s: &str) -> Vec { hex::decode(s).unwrap() } -fn vec_into_btmap(arr: Vec<(&'static str, Value)>) -> BTreeMap { +fn vec_into_btmap(arr: Vec<(&'static str, Value)>) -> ObjectMap { BTreeMap::from_iter( arr.into_iter() .map(|(k, v)| (k.to_string(), v)) diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 896d1f374b968..54af03f402b72 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -598,7 +598,7 @@ mod test { "one line".into() ); - let tls_meta: BTreeMap = btreemap!( + let tls_meta: ObjectMap = btreemap!( "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US" ); @@ -663,7 +663,7 @@ mod test { assert_eq!(log.value(), &"one line".into()); - let tls_meta: BTreeMap = btreemap!( + let tls_meta: ObjectMap = btreemap!( "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US" ); diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index 1418d77730d6f..7a822d190a24f 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -1442,7 +1442,7 @@ mod test { } } - fn structured_data_from_fields(fields: BTreeMap) -> StructuredData { + fn structured_data_from_fields(fields: ObjectMap) -> StructuredData { let mut structured_data = StructuredData::default(); for (key, value) in fields.into_iter() { diff --git a/src/sources/util/net/tcp/mod.rs b/src/sources/util/net/tcp/mod.rs index a31ba249a172d..b22e6ee155944 100644 --- a/src/sources/util/net/tcp/mod.rs +++ b/src/sources/util/net/tcp/mod.rs @@ -1,6 +1,6 @@ mod request_limiter; -use std::{collections::BTreeMap, io, mem::drop, net::SocketAddr, time::Duration}; +use std::{io, mem::drop, net::SocketAddr, time::Duration}; use bytes::Bytes; use codecs::StreamDecodingError; @@ -22,7 +22,7 @@ use vector_core::{ config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig}, EstimatedJsonEncodedSizeOf, }; -use vrl::value::Value; +use vrl::value::ObjectMap; use self::request_limiter::RequestLimiter; use super::SocketListenAddr; @@ -364,8 +364,8 @@ async fn handle_stream( if let Some(certificate_metadata) = &certificate_metadata { - let mut metadata: BTreeMap = BTreeMap::new(); - metadata.insert("subject".to_string(), certificate_metadata.subject().into()); + let mut metadata = ObjectMap::new(); + metadata.insert("subject".into(), certificate_metadata.subject().into()); for event in &mut events { let log = event.as_mut_log(); diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index f0e31cd77f5b1..fc56d00af3f8b 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -677,12 +677,12 @@ mod tests { let (topology, mut out) = create_topology(ReceiverStream::new(rx), transform_config).await; - let mut map1: BTreeMap = BTreeMap::new(); + let mut map1 = ObjectMap::new(); map1.insert("key".into(), "123".into()); let mut event1 = Event::Log(LogEvent::from("message")); event1.as_mut_log().insert("matched", map1); - let mut map2: BTreeMap = BTreeMap::new(); + let mut map2 = ObjectMap::new(); map2.insert("key".into(), 123.into()); let mut event2 = Event::Log(LogEvent::from("message")); event2.as_mut_log().insert("matched", map2); diff --git a/src/transforms/reduce/merge_strategy.rs b/src/transforms/reduce/merge_strategy.rs index e7ddd9cd80386..4990a2b580510 100644 --- a/src/transforms/reduce/merge_strategy.rs +++ b/src/transforms/reduce/merge_strategy.rs @@ -6,7 +6,7 @@ use ordered_float::NotNan; use vector_config::configurable_component; use vrl::event_path; -use crate::event::{LogEvent, Value}; +use crate::event::{KeyString, LogEvent, Value}; /// Strategies for merging events. #[configurable_component] @@ -68,7 +68,7 @@ impl ReduceValueMerger for DiscardMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), self.v); Ok(()) } @@ -94,7 +94,7 @@ impl ReduceValueMerger for RetainMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), self.v); Ok(()) } @@ -134,7 +134,7 @@ impl ReduceValueMerger for ConcatMerger { } } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), Value::Bytes(self.v.into())); Ok(()) } @@ -161,7 +161,7 @@ impl ReduceValueMerger for ConcatArrayMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), Value::Array(self.v)); Ok(()) } @@ -184,7 +184,7 @@ impl ReduceValueMerger for ArrayMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), Value::Array(self.v)); Ok(()) } @@ -216,7 +216,7 @@ impl ReduceValueMerger for LongestArrayMerger { } } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), Value::Array(self.v)); Ok(()) } @@ -248,7 +248,7 @@ impl ReduceValueMerger for ShortestArrayMerger { } } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), Value::Array(self.v)); Ok(()) } @@ -293,7 +293,7 @@ impl ReduceValueMerger for FlatUniqueMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert( event_path!(k.as_str()), Value::Array(self.v.into_iter().collect()), @@ -330,7 +330,7 @@ impl ReduceValueMerger for TimestampWindowMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert( event_path!(format!("{}_end", k).as_str()), Value::Timestamp(self.latest), @@ -394,7 +394,7 @@ impl ReduceValueMerger for AddNumbersMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(event_path!(k.as_str()), Value::Float(f)), NumberMergerValue::Int(i) => v.insert(event_path!(k.as_str()), Value::Integer(i)), @@ -453,7 +453,7 @@ impl ReduceValueMerger for MaxNumberMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(event_path!(k.as_str()), Value::Float(f)), NumberMergerValue::Int(i) => v.insert(event_path!(k.as_str()), Value::Integer(i)), @@ -512,7 +512,7 @@ impl ReduceValueMerger for MinNumberMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(event_path!(k.as_str()), Value::Float(f)), NumberMergerValue::Int(i) => v.insert(event_path!(k.as_str()), Value::Integer(i)), @@ -523,7 +523,7 @@ impl ReduceValueMerger for MinNumberMerger { pub trait ReduceValueMerger: std::fmt::Debug + Send + Sync { fn add(&mut self, v: Value) -> Result<(), String>; - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String>; + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String>; } impl From for Box { diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index ba0ad8ffbd0be..e86bac04dd103 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -30,7 +30,7 @@ pub use merge_strategy::*; use vector_core::config::LogNamespace; use vector_core::stream::expiration_map::{map_with_expiration, Emitter}; use vrl::value::kind::Collection; -use vrl::value::Kind; +use vrl::value::{KeyString, Kind}; /// Configuration for the `reduce` transform. #[serde_as] @@ -91,7 +91,7 @@ pub struct ReduceConfig { #[configurable(metadata( docs::additional_props_description = "An individual merge strategy." ))] - pub merge_strategies: IndexMap, + pub merge_strategies: IndexMap, /// A condition used to distinguish the final event of a transaction. /// @@ -237,7 +237,7 @@ impl TransformConfig for ReduceConfig { #[derive(Debug)] struct ReduceState { events: usize, - fields: HashMap>, + fields: HashMap>, stale_since: Instant, metadata: EventMetadata, } @@ -255,7 +255,7 @@ impl ReduceState { } } - fn add_event(&mut self, e: LogEvent, strategies: &IndexMap) { + fn add_event(&mut self, e: LogEvent, strategies: &IndexMap) { let (value, metadata) = e.into_parts(); self.metadata.merge(metadata); @@ -309,7 +309,7 @@ pub struct Reduce { expire_after: Duration, flush_period: Duration, group_by: Vec, - merge_strategies: IndexMap, + merge_strategies: IndexMap, reduce_merge_states: HashMap, ends_when: Option, starts_when: Option,