diff --git a/Cargo.lock b/Cargo.lock index 4871c1dcad290..3d0d810a22f61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10405,8 +10405,7 @@ checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" [[package]] name = "vrl" version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8a93ee342590c4df0ff63961d7d76a347e0c7b6e6c0be4c001317ca1ff11b53" +source = "git+https://github.com/vectordotdev/vrl?rev=37319dfca17dc5d7637b24455199029cc30eb128#37319dfca17dc5d7637b24455199029cc30eb128" dependencies = [ "aes", "ansi_term", diff --git a/Cargo.toml b/Cargo.toml index 2f75e86713fe9..3127dc1238b3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -119,13 +119,12 @@ members = [ ] [workspace.dependencies] -vrl = { version = "0.8.1", features = ["cli"] } - pin-project = { version = "1.1.3", default-features = false } +vrl = { git = "https://github.com/vectordotdev/vrl", rev = "37319dfca17dc5d7637b24455199029cc30eb128", features = ["arbitrary", "cli", "test", "test_framework"] } [dependencies] -vrl.workspace = true pin-project.workspace = true +vrl.workspace = true # Internal libs dnsmsg-parser = { path = "lib/dnsmsg-parser", optional = true } @@ -362,7 +361,7 @@ tokio = { version = "1.33.0", features = ["test-util"] } tokio-test = "0.4.3" tower-test = "0.4.0" vector-lib = { path = "lib/vector-lib", default-features = false, features = ["vrl", "test"] } -vrl = { version = "0.8.1", features = ["cli", "test", "test_framework", "arbitrary"] } +vrl = { git = "https://github.com/vectordotdev/vrl", rev = "37319dfca17dc5d7637b24455199029cc30eb128", features = ["cli", "test", "test_framework", "arbitrary"] } wiremock = "0.5.21" zstd = { version = "0.13.0", default-features = false } diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 52e6d501391b9..cbb0b1b8a7f10 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -38,8 +38,8 @@ futures = { version = "0.3", default-features = false } indoc = { version = "2", default-features = false } tokio = { version = "1", features = ["test-util"] } similar-asserts = "1.5.0" -vrl = { version = "0.8.1", features = ["cli", "test", "test_framework", "arbitrary"] } vector-core = { path = "../vector-core", default-features = false, features = ["test"] } +vrl.workspace = true [features] syslog = ["dep:syslog_loose"] diff --git a/lib/codecs/src/decoding/format/protobuf.rs b/lib/codecs/src/decoding/format/protobuf.rs index 65be4990b0905..09b5459105c29 100644 --- a/lib/codecs/src/decoding/format/protobuf.rs +++ b/lib/codecs/src/decoding/format/protobuf.rs @@ -1,4 +1,3 @@ -use std::collections::BTreeMap; use std::path::PathBuf; use bytes::Bytes; @@ -14,7 +13,7 @@ use vector_core::{ event::Event, schema, }; -use vrl::value::Kind; +use vrl::value::{Kind, ObjectMap}; use crate::common::protobuf::get_message_descriptor; @@ -171,11 +170,11 @@ fn to_vrl( } } prost_reflect::Value::Message(v) => { - let mut obj_map = BTreeMap::new(); + let mut obj_map = ObjectMap::new(); for field_desc in v.descriptor().fields() { let field_value = v.get_field(&field_desc); let out = to_vrl(field_value.as_ref(), Some(&field_desc))?; - obj_map.insert(field_desc.name().to_string(), out); + obj_map.insert(field_desc.name().into(), out); } vrl::value::Value::from(obj_map) } @@ -206,11 +205,11 @@ fn to_vrl( field_descriptor ) })? - .to_string(), + .into(), to_vrl(kv.1, Some(&message_desc.map_entry_value_field()))?, )) }) - .collect::>>()?, + .collect::>()?, ) } else { Err("Expected valid field descriptor")? diff --git a/lib/codecs/src/decoding/format/syslog.rs b/lib/codecs/src/decoding/format/syslog.rs index 041d6a4c93cc6..33ab5d1d05f36 100644 --- a/lib/codecs/src/decoding/format/syslog.rs +++ b/lib/codecs/src/decoding/format/syslog.rs @@ -4,13 +4,12 @@ use derivative::Derivative; use lookup::{event_path, owned_value_path, OwnedTargetPath, OwnedValuePath}; use smallvec::{smallvec, SmallVec}; use std::borrow::Cow; -use std::collections::BTreeMap; use syslog_loose::{IncompleteDate, Message, ProcId, Protocol, Variant}; use vector_config::configurable_component; use vector_core::config::{LegacyKey, LogNamespace}; use vector_core::{ config::{log_schema, DataType}, - event::{Event, LogEvent, Value}, + event::{Event, LogEvent, ObjectMap, Value}, schema, }; use vrl::value::{kind::Collection, Kind}; @@ -292,7 +291,7 @@ impl Deserializer for SyslogDeserializer { log } _ => { - let mut log = LogEvent::from(Value::Object(BTreeMap::new())); + let mut log = LogEvent::from(Value::Object(ObjectMap::new())); insert_fields_from_syslog(&mut log, parsed, log_namespace); log } @@ -402,15 +401,15 @@ fn insert_metadata_fields_from_syslog( ); } - let mut sdata: BTreeMap = BTreeMap::new(); + let mut sdata = ObjectMap::new(); for element in parsed.structured_data.into_iter() { - let mut data: BTreeMap = BTreeMap::new(); + let mut data = ObjectMap::new(); for (name, value) in element.params() { - data.insert(name.to_string(), value.into()); + data.insert(name.to_string().into(), value.into()); } - sdata.insert(element.id.to_string(), data.into()); + sdata.insert(element.id.into(), data.into()); } log_namespace.insert_source_metadata( @@ -474,9 +473,9 @@ fn insert_fields_from_syslog( } for element in parsed.structured_data.into_iter() { - let mut sdata: BTreeMap = BTreeMap::new(); + let mut sdata = ObjectMap::new(); for (name, value) in element.params() { - sdata.insert(name.to_string(), value.into()); + sdata.insert(name.to_string().into(), value.into()); } log.insert(event_path!(element.id), sdata); } diff --git a/lib/codecs/src/encoding/format/csv.rs b/lib/codecs/src/encoding/format/csv.rs index cc485139fdee7..f28a21b3faea2 100644 --- a/lib/codecs/src/encoding/format/csv.rs +++ b/lib/codecs/src/encoding/format/csv.rs @@ -309,20 +309,20 @@ mod tests { use chrono::DateTime; use ordered_float::NotNan; use vector_common::btreemap; - use vector_core::event::{LogEvent, Value}; + use vector_core::event::{LogEvent, ObjectMap, Value}; use super::*; fn make_event_with_fields(field_data: Vec<(&str, &str)>) -> (Vec, Event) { let mut fields: Vec = std::vec::Vec::new(); - let mut tree = std::collections::BTreeMap::new(); + let mut tree = ObjectMap::new(); - for (field_name, field_value) in field_data.iter() { - let field = ConfigTargetPath::try_from(field_name.to_string()).unwrap(); + for (field_name, field_value) in field_data.into_iter() { + let field = ConfigTargetPath::try_from(field_name.clone()).unwrap(); fields.push(field); let field_value = Value::from(field_value.to_string()); - tree.insert(field_name.to_string().clone(), field_value); + tree.insert(field_name.into(), field_value); } let event = Event::Log(LogEvent::from(tree)); diff --git a/lib/codecs/src/encoding/format/gelf.rs b/lib/codecs/src/encoding/format/gelf.rs index d67ab918060f2..764c9adf802a0 100644 --- a/lib/codecs/src/encoding/format/gelf.rs +++ b/lib/codecs/src/encoding/format/gelf.rs @@ -8,9 +8,7 @@ use snafu::Snafu; use tokio_util::codec::Encoder; use vector_core::{ config::{log_schema, DataType}, - event::Event, - event::LogEvent, - event::Value, + event::{Event, KeyString, LogEvent, Value}, schema, }; @@ -26,7 +24,7 @@ use vector_core::{ #[derive(Debug, Snafu)] pub enum GelfSerializerError { #[snafu(display(r#"LogEvent does not contain required field: "{}""#, field))] - MissingField { field: String }, + MissingField { field: KeyString }, #[snafu(display( r#"LogEvent contains a value with an invalid type. field = "{}" type = "{}" expected type = "{}""#, field, @@ -194,9 +192,11 @@ fn coerce_field_names_and_values( _ => { // additional fields must be only word chars, dashes and periods. if !VALID_FIELD_REGEX.is_match(field) { - return MissingFieldSnafu { field } - .fail() - .map_err(|e| e.to_string().into()); + return MissingFieldSnafu { + field: field.clone(), + } + .fail() + .map_err(|e| e.to_string().into()); } // additional field values must be only strings or numbers @@ -238,20 +238,15 @@ fn to_gelf_event(log: LogEvent) -> vector_common::Result { #[cfg(test)] mod tests { - use std::collections::BTreeMap; - use crate::encoding::SerializerConfig; use super::*; use chrono::NaiveDateTime; use vector_core::event::{Event, EventMetadata}; use vrl::btreemap; - use vrl::value::Value; + use vrl::value::{ObjectMap, Value}; - fn do_serialize( - expect_success: bool, - event_fields: BTreeMap, - ) -> Option { + fn do_serialize(expect_success: bool, event_fields: ObjectMap) -> Option { let config = GelfSerializerConfig::new(); let mut serializer = config.build(); let event: Event = LogEvent::from_map(event_fields, EventMetadata::default()).into(); diff --git a/lib/codecs/src/encoding/format/protobuf.rs b/lib/codecs/src/encoding/format/protobuf.rs index a67a631edf3eb..e900be821bdd9 100644 --- a/lib/codecs/src/encoding/format/protobuf.rs +++ b/lib/codecs/src/encoding/format/protobuf.rs @@ -116,7 +116,7 @@ fn convert_value_raw( for (key, val) in o.into_iter() { match convert_value(&value_field, val) { Ok(prost_val) => { - map.insert(MapKey::String(key), prost_val); + map.insert(MapKey::String(key.into()), prost_val); } Err(e) => return Err(e), } diff --git a/lib/enrichment/src/find_enrichment_table_records.rs b/lib/enrichment/src/find_enrichment_table_records.rs index 7086709cd208a..f2ec4313b60d5 100644 --- a/lib/enrichment/src/find_enrichment_table_records.rs +++ b/lib/enrichment/src/find_enrichment_table_records.rs @@ -132,7 +132,7 @@ impl Function for FindEnrichmentTableRecords { #[derive(Debug, Clone)] pub struct FindEnrichmentTableRecordsFn { table: String, - condition: BTreeMap, + condition: BTreeMap, index: Option, select: Option>, case_sensitive: Case, @@ -203,7 +203,7 @@ mod tests { }; let tz = TimeZone::default(); - let object: Value = BTreeMap::new().into(); + let object: Value = ObjectMap::new().into(); let mut target = TargetValue { value: object, metadata: value!({}), diff --git a/lib/enrichment/src/get_enrichment_table_record.rs b/lib/enrichment/src/get_enrichment_table_record.rs index a028894d89373..ddd99c99c804b 100644 --- a/lib/enrichment/src/get_enrichment_table_record.rs +++ b/lib/enrichment/src/get_enrichment_table_record.rs @@ -124,7 +124,7 @@ impl Function for GetEnrichmentTableRecord { #[derive(Debug, Clone)] pub struct GetEnrichmentTableRecordFn { table: String, - condition: BTreeMap, + condition: BTreeMap, index: Option, select: Option>, case_sensitive: Case, diff --git a/lib/enrichment/src/lib.rs b/lib/enrichment/src/lib.rs index 625c4780900a5..52f2a547f50f1 100644 --- a/lib/enrichment/src/lib.rs +++ b/lib/enrichment/src/lib.rs @@ -7,12 +7,11 @@ pub mod tables; #[cfg(test)] mod test_util; mod vrl_util; -use std::collections::BTreeMap; use dyn_clone::DynClone; pub use tables::{TableRegistry, TableSearch}; use vrl::compiler::Function; -use vrl::value::Value; +use vrl::value::{ObjectMap, Value}; #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub struct IndexHandle(pub usize); @@ -49,7 +48,7 @@ pub trait Table: DynClone { condition: &'a [Condition<'a>], select: Option<&[String]>, index: Option, - ) -> Result, String>; + ) -> Result; /// Search the enrichment table data with the given condition. /// All conditions must match (AND). @@ -60,7 +59,7 @@ pub trait Table: DynClone { condition: &'a [Condition<'a>], select: Option<&[String]>, index: Option, - ) -> Result>, String>; + ) -> Result, String>; /// Hints to the enrichment table what data is going to be searched to allow it to index the /// data in advance. diff --git a/lib/enrichment/src/tables.rs b/lib/enrichment/src/tables.rs index fb43c3a7d0205..6f494491686f8 100644 --- a/lib/enrichment/src/tables.rs +++ b/lib/enrichment/src/tables.rs @@ -30,12 +30,12 @@ //! can be searched. use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, sync::{Arc, Mutex}, }; use arc_swap::ArcSwap; -use vrl::value::Value; +use vrl::value::ObjectMap; use super::{Condition, IndexHandle, Table}; use crate::Case; @@ -205,7 +205,7 @@ impl TableSearch { condition: &'a [Condition<'a>], select: Option<&[String]>, index: Option, - ) -> Result, String> { + ) -> Result { let tables = self.0.load(); if let Some(ref tables) = **tables { match tables.get(table) { @@ -227,7 +227,7 @@ impl TableSearch { condition: &'a [Condition<'a>], select: Option<&[String]>, index: Option, - ) -> Result>, String> { + ) -> Result, String> { let tables = self.0.load(); if let Some(ref tables) = **tables { match tables.get(table) { @@ -357,7 +357,7 @@ mod tests { registry.finish_load(); assert_eq!( - Ok(BTreeMap::from([("field".into(), Value::from("result"))])), + Ok(ObjectMap::from([("field".into(), Value::from("result"))])), tables_search.find_table_row( "dummy1", Case::Sensitive, @@ -410,8 +410,8 @@ mod tests { // After we finish load there are no tables in the list assert!(registry.table_ids().is_empty()); - let mut new_data = BTreeMap::new(); - new_data.insert("thing".to_string(), Value::Null); + let mut new_data = ObjectMap::new(); + new_data.insert("thing".into(), Value::Null); let mut tables: TableMap = HashMap::new(); tables.insert( diff --git a/lib/enrichment/src/test_util.rs b/lib/enrichment/src/test_util.rs index cf634f2102696..1eed09e200ca6 100644 --- a/lib/enrichment/src/test_util.rs +++ b/lib/enrichment/src/test_util.rs @@ -1,15 +1,15 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, sync::{Arc, Mutex}, }; -use vrl::value::Value; +use vrl::value::{ObjectMap, Value}; use crate::{Case, Condition, IndexHandle, Table, TableRegistry}; #[derive(Debug, Clone)] pub(crate) struct DummyEnrichmentTable { - data: BTreeMap, + data: ObjectMap, indexes: Arc>>>, } @@ -20,12 +20,12 @@ impl DummyEnrichmentTable { pub(crate) fn new_with_index(indexes: Arc>>>) -> Self { Self { - data: BTreeMap::from([("field".to_string(), Value::from("result"))]), + data: ObjectMap::from([("field".into(), Value::from("result"))]), indexes, } } - pub(crate) fn new_with_data(data: BTreeMap) -> Self { + pub(crate) fn new_with_data(data: ObjectMap) -> Self { Self { data, indexes: Default::default(), @@ -40,7 +40,7 @@ impl Table for DummyEnrichmentTable { _condition: &[Condition], _select: Option<&[String]>, _index: Option, - ) -> Result, String> { + ) -> Result { Ok(self.data.clone()) } @@ -50,7 +50,7 @@ impl Table for DummyEnrichmentTable { _condition: &[Condition], _select: Option<&[String]>, _index: Option, - ) -> Result>, String> { + ) -> Result, String> { Ok(vec![self.data.clone()]) } diff --git a/lib/enrichment/src/vrl_util.rs b/lib/enrichment/src/vrl_util.rs index ccdebd3cdfbb1..61e7044c53c99 100644 --- a/lib/enrichment/src/vrl_util.rs +++ b/lib/enrichment/src/vrl_util.rs @@ -64,7 +64,7 @@ pub(crate) fn add_index( registry: &mut TableRegistry, tablename: &str, case: Case, - condition: &BTreeMap, + condition: &BTreeMap, ) -> std::result::Result { let fields = condition .iter() @@ -112,10 +112,8 @@ mod tests { #[test] fn add_indexes() { let mut registry = test_util::get_table_registry(); - let conditions = BTreeMap::from([( - "field".to_owned(), - expression::Literal::from("value").into(), - )]); + let conditions = + BTreeMap::from([("field".into(), expression::Literal::from("value").into())]); let index = add_index(&mut registry, "dummy1", Case::Insensitive, &conditions).unwrap(); assert_eq!(IndexHandle(0), index); diff --git a/lib/opentelemetry-proto/src/convert.rs b/lib/opentelemetry-proto/src/convert.rs index 875be35d4dd9c..d1f91798f44c6 100644 --- a/lib/opentelemetry-proto/src/convert.rs +++ b/lib/opentelemetry-proto/src/convert.rs @@ -2,12 +2,11 @@ use bytes::Bytes; use chrono::{DateTime, TimeZone, Utc}; use lookup::path; use ordered_float::NotNan; -use std::collections::BTreeMap; use vector_core::{ config::{log_schema, LegacyKey, LogNamespace}, event::{Event, LogEvent}, }; -use vrl::value::Value; +use vrl::value::{ObjectMap, Value}; use super::proto::{ common::v1::{any_value::Value as PBValue, KeyValue}, @@ -73,10 +72,14 @@ fn kv_list_into_value(arr: Vec) -> Value { Value::Object( arr.into_iter() .filter_map(|kv| { - kv.value - .map(|av| (kv.key, av.value.map(Into::into).unwrap_or(Value::Null))) + kv.value.map(|av| { + ( + kv.key.into(), + av.value.map(Into::into).unwrap_or(Value::Null), + ) + }) }) - .collect::>(), + .collect::(), ) } diff --git a/lib/vector-common/src/byte_size_of.rs b/lib/vector-common/src/byte_size_of.rs index 0b984ac41dfa3..b0de324465f86 100644 --- a/lib/vector-common/src/byte_size_of.rs +++ b/lib/vector-common/src/byte_size_of.rs @@ -7,7 +7,7 @@ use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Utc}; use serde_json::{value::RawValue, Value as JsonValue}; use smallvec::SmallVec; -use vrl::value::Value; +use vrl::value::{KeyString, Value}; pub trait ByteSizeOf { /// Returns the in-memory size of this type @@ -58,6 +58,12 @@ impl ByteSizeOf for String { } } +impl ByteSizeOf for KeyString { + fn allocated_bytes(&self) -> usize { + self.len() + } +} + impl<'a> ByteSizeOf for &'a str { fn allocated_bytes(&self) -> usize { 0 diff --git a/lib/vector-config/src/stdlib.rs b/lib/vector-config/src/stdlib.rs index 7958c42d62609..ad3ae47aac32d 100644 --- a/lib/vector-config/src/stdlib.rs +++ b/lib/vector-config/src/stdlib.rs @@ -14,6 +14,7 @@ use std::{ use indexmap::IndexMap; use serde_json::{Number, Value}; use vector_config_common::{attributes::CustomAttribute, constants, validation::Validation}; +use vrl::value::KeyString; use crate::{ num::ConfigurableNumber, @@ -113,6 +114,22 @@ impl ToValue for String { } } +impl Configurable for KeyString { + fn metadata() -> Metadata { + Metadata::with_transparent(true) + } + + fn generate_schema(_: &RefCell) -> Result { + Ok(generate_string_schema()) + } +} + +impl ToValue for KeyString { + fn to_value(&self) -> Value { + Value::String(self.clone().into()) + } +} + impl Configurable for char { fn metadata() -> Metadata { let mut metadata = Metadata::with_transparent(true); diff --git a/lib/vector-config/src/str.rs b/lib/vector-config/src/str.rs index d1f303158d4d4..223a6019f43ca 100644 --- a/lib/vector-config/src/str.rs +++ b/lib/vector-config/src/str.rs @@ -1,3 +1,5 @@ +use vrl::value::KeyString; + use crate::Configurable; /// A string-like type that can be represented in a Vector configuration. @@ -10,3 +12,5 @@ use crate::Configurable; pub trait ConfigurableString: Configurable + ToString {} impl ConfigurableString for String {} + +impl ConfigurableString for KeyString {} diff --git a/lib/vector-core/Cargo.toml b/lib/vector-core/Cargo.toml index 73b52d39a7bdc..f3a08dc5438f9 100644 --- a/lib/vector-core/Cargo.toml +++ b/lib/vector-core/Cargo.toml @@ -91,7 +91,6 @@ rand = "0.8.5" rand_distr = "0.4.3" tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt", "ansi", "registry"] } vector-common = { path = "../vector-common", default-features = false, features = ["test"] } -vrl = { version = "0.8.1", features = ["cli", "test", "test_framework", "arbitrary"] } [features] api = ["dep:async-graphql"] diff --git a/lib/vector-core/src/event/discriminant.rs b/lib/vector-core/src/event/discriminant.rs index 98ff1d996ba12..b1bd9589ebb81 100644 --- a/lib/vector-core/src/event/discriminant.rs +++ b/lib/vector-core/src/event/discriminant.rs @@ -1,9 +1,6 @@ -use std::{ - collections::BTreeMap, - hash::{Hash, Hasher}, -}; +use std::hash::{Hash, Hasher}; -use super::{LogEvent, Value}; +use super::{LogEvent, ObjectMap, Value}; // TODO: if we had `Value` implement `Eq` and `Hash`, the implementation here // would be much easier. The issue is with `f64` type. We should consider using @@ -98,7 +95,7 @@ fn array_eq(this: &[Value], other: &[Value]) -> bool { .all(|(first, second)| value_eq(first, second)) } -fn map_eq(this: &BTreeMap, other: &BTreeMap) -> bool { +fn map_eq(this: &ObjectMap, other: &ObjectMap) -> bool { if this.len() != other.len() { return false; } @@ -150,7 +147,7 @@ fn hash_array(hasher: &mut H, array: &[Value]) { } } -fn hash_map(hasher: &mut H, map: &BTreeMap) { +fn hash_map(hasher: &mut H, map: &ObjectMap) { for (key, val) in map { hasher.write(key.as_bytes()); hash_value(hasher, val); diff --git a/lib/vector-core/src/event/estimated_json_encoded_size_of.rs b/lib/vector-core/src/event/estimated_json_encoded_size_of.rs index 9bfa3bf50a8b7..2e848e7c84648 100644 --- a/lib/vector-core/src/event/estimated_json_encoded_size_of.rs +++ b/lib/vector-core/src/event/estimated_json_encoded_size_of.rs @@ -5,7 +5,7 @@ use chrono::{DateTime, Timelike, Utc}; use ordered_float::NotNan; use smallvec::SmallVec; use vector_common::json_size::JsonSize; -use vrl::value::Value; +use vrl::value::{KeyString, Value}; const NULL_SIZE: JsonSize = JsonSize::new(4); const TRUE_SIZE: JsonSize = JsonSize::new(4); @@ -100,6 +100,12 @@ impl EstimatedJsonEncodedSizeOf for String { } } +impl EstimatedJsonEncodedSizeOf for KeyString { + fn estimated_json_encoded_size_of(&self) -> JsonSize { + self.as_str().estimated_json_encoded_size_of() + } +} + impl EstimatedJsonEncodedSizeOf for Bytes { fn estimated_json_encoded_size_of(&self) -> JsonSize { JsonSize::new(QUOTES_SIZE + self.len()) diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 25306f350b3ea..93e45cb17a55f 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use chrono::Utc; use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, convert::{TryFrom, TryInto}, fmt::Debug, iter::FromIterator, @@ -11,9 +11,7 @@ use std::{ }; use crossbeam_utils::atomic::AtomicCell; -use lookup::lookup_v2::TargetPath; -use lookup::PathPrefix; -use lookup::{metadata_path, path}; +use lookup::{lookup_v2::TargetPath, metadata_path, path, PathPrefix}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize, Serializer}; use vector_common::{ @@ -30,12 +28,12 @@ use super::{ estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf, finalization::{BatchNotifier, EventFinalizer}, metadata::EventMetadata, - util, - util::log::{all_fields, all_metadata_fields}, - EventFinalizers, Finalizable, MaybeAsLogMut, Value, + util, EventFinalizers, Finalizable, KeyString, ObjectMap, Value, }; use crate::config::LogNamespace; use crate::config::{log_schema, telemetry}; +use crate::event::util::log::{all_fields, all_metadata_fields}; +use crate::event::MaybeAsLogMut; static VECTOR_SOURCE_TYPE_PATH: Lazy> = Lazy::new(|| { Some(OwnedTargetPath::metadata(owned_value_path!( @@ -264,8 +262,8 @@ impl LogEvent { } } - /// Create a `LogEvent` from a `BTreeMap` and `EventMetadata` - pub fn from_map(map: BTreeMap, metadata: EventMetadata) -> Self { + /// Create a `LogEvent` from an `ObjectMap` and `EventMetadata` + pub fn from_map(map: ObjectMap, metadata: EventMetadata) -> Self { let inner = Arc::new(Inner::from(Value::Object(map))); Self { inner, metadata } } @@ -415,7 +413,7 @@ impl LogEvent { } } - pub fn keys(&self) -> Option + '_> { + pub fn keys(&self) -> Option + '_> { match &self.inner.fields { Value::Object(map) => Some(util::log::keys(map)), _ => None, @@ -424,7 +422,9 @@ impl LogEvent { /// If the event root value is a map, build and return an iterator to event field and value pairs. /// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods. - pub fn all_event_fields(&self) -> Option + Serialize> { + pub fn all_event_fields( + &self, + ) -> Option + Serialize> { self.as_map().map(all_fields) } @@ -432,7 +432,7 @@ impl LogEvent { /// TODO: Ideally this should return target paths to be consistent with other `LogEvent` methods. pub fn all_metadata_fields( &self, - ) -> Option + Serialize> { + ) -> Option + Serialize> { match self.metadata.value() { Value::Object(metadata_map) => Some(metadata_map).map(all_metadata_fields), _ => None, @@ -441,7 +441,7 @@ impl LogEvent { /// Returns an iterator of all fields if the value is an Object. Otherwise, /// a single field is returned with a "message" key - pub fn convert_to_fields(&self) -> impl Iterator + Serialize { + pub fn convert_to_fields(&self) -> impl Iterator + Serialize { if let Some(map) = self.as_map() { util::log::all_fields(map) } else { @@ -457,14 +457,14 @@ impl LogEvent { } } - pub fn as_map(&self) -> Option<&BTreeMap> { + pub fn as_map(&self) -> Option<&ObjectMap> { match self.value() { Value::Object(map) => Some(map), _ => None, } } - pub fn as_map_mut(&mut self) -> Option<&mut BTreeMap> { + pub fn as_map_mut(&mut self) -> Option<&mut ObjectMap> { match self.value_mut() { Value::Object(map) => Some(map), _ => None, @@ -634,16 +634,16 @@ impl From for LogEvent { } } -impl From> for LogEvent { - fn from(map: BTreeMap) -> Self { +impl From for LogEvent { + fn from(map: ObjectMap) -> Self { Self::from_parts(Value::Object(map), EventMetadata::default()) } } -impl From> for LogEvent { - fn from(map: HashMap) -> Self { +impl From> for LogEvent { + fn from(map: HashMap) -> Self { Self::from_parts( - Value::Object(map.into_iter().collect::>()), + Value::Object(map.into_iter().collect::()), EventMetadata::default(), ) } @@ -657,8 +657,8 @@ impl TryFrom for LogEvent { serde_json::Value::Object(fields) => Ok(LogEvent::from( fields .into_iter() - .map(|(k, v)| (k, v.into())) - .collect::>(), + .map(|(k, v)| (k.into(), v.into())) + .collect::(), )), _ => Err(crate::Error::from( "Attempted to convert non-Object JSON into a LogEvent.", @@ -1113,14 +1113,14 @@ mod test { log.insert("a", 0); log.insert("a.b", 1); log.insert("c", 2); - let actual: Vec<(String, Value)> = log + let actual: Vec<(KeyString, Value)> = log .all_event_fields() .unwrap() .map(|(s, v)| (s, v.clone())) .collect(); assert_eq!( actual, - vec![("a.b".to_string(), 1.into()), ("c".to_string(), 2.into())] + vec![("a.b".into(), 1.into()), ("c".into(), 2.into())] ); } @@ -1130,14 +1130,14 @@ mod test { log.insert("%a", 0); log.insert("%a.b", 1); log.insert("%c", 2); - let actual: Vec<(String, Value)> = log + let actual: Vec<(KeyString, Value)> = log .all_metadata_fields() .unwrap() .map(|(s, v)| (s, v.clone())) .collect(); assert_eq!( actual, - vec![("%a.b".to_string(), 1.into()), ("%c".to_string(), 2.into())] + vec![("%a.b".into(), 1.into()), ("%c".into(), 2.into())] ); } } diff --git a/lib/vector-core/src/event/metadata.rs b/lib/vector-core/src/event/metadata.rs index 0030464ac06c5..a9ae6e2d16fd1 100644 --- a/lib/vector-core/src/event/metadata.rs +++ b/lib/vector-core/src/event/metadata.rs @@ -6,10 +6,10 @@ use serde::{Deserialize, Serialize}; use vector_common::{byte_size_of::ByteSizeOf, config::ComponentKey, EventDataEq}; use vrl::{ compiler::SecretTarget, - value::{Kind, Value}, + value::{KeyString, Kind, Value}, }; -use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus}; +use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, ObjectMap}; use crate::{ config::{LogNamespace, OutputId}, schema, @@ -59,7 +59,7 @@ pub struct EventMetadata { /// we need to ensure it is still available later on for emitting metrics tagged by the service. /// This field could almost be keyed by `&'static str`, but because it needs to be deserializable /// we have to use `String`. - dropped_fields: BTreeMap, + dropped_fields: ObjectMap, /// Metadata to track the origin of metrics. This is always `None` for log and trace events. /// Only a small set of Vector sources and transforms explicitly set this field. @@ -110,7 +110,7 @@ impl DatadogMetricOriginMetadata { } fn default_metadata_value() -> Value { - Value::Object(BTreeMap::new()) + Value::Object(ObjectMap::new()) } impl EventMetadata { @@ -200,7 +200,7 @@ impl EventMetadata { /// There is currently no way to remove a field from this list, so if a field is dropped /// and then the field is re-added with a new value - the dropped value will still be /// retrieved. - pub fn add_dropped_field(&mut self, meaning: String, value: Value) { + pub fn add_dropped_field(&mut self, meaning: KeyString, value: Value) { self.dropped_fields.insert(meaning, value); } @@ -218,14 +218,14 @@ impl EventMetadata { impl Default for EventMetadata { fn default() -> Self { Self { - value: Value::Object(BTreeMap::new()), + value: Value::Object(ObjectMap::new()), secrets: Secrets::new(), finalizers: Default::default(), schema_definition: default_schema_definition(), source_id: None, source_type: None, upstream_id: None, - dropped_fields: BTreeMap::new(), + dropped_fields: ObjectMap::new(), datadog_origin_metadata: None, } } diff --git a/lib/vector-core/src/event/mod.rs b/lib/vector-core/src/event/mod.rs index 49c002fb2e037..27c779212e04b 100644 --- a/lib/vector-core/src/event/mod.rs +++ b/lib/vector-core/src/event/mod.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, convert::TryInto, fmt::Debug, sync::Arc}; +use std::{convert::TryInto, fmt::Debug, sync::Arc}; pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray}; pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf; @@ -17,7 +17,7 @@ use vector_common::{ byte_size_of::ByteSizeOf, config::ComponentKey, finalization, internal_event::TaggedEventsSent, json_size::JsonSize, request_metadata::GetEventCountTags, EventDataEq, }; -pub use vrl::value::Value; +pub use vrl::value::{KeyString, ObjectMap, Value}; #[cfg(feature = "vrl")] pub use vrl_target::{TargetEvents, VrlTarget}; @@ -348,8 +348,8 @@ impl Event { serde_json::Value::Object(fields) => Ok(LogEvent::from( fields .into_iter() - .map(|(k, v)| (k, v.into())) - .collect::>(), + .map(|(k, v)| (k.into(), v.into())) + .collect::(), ) .into()), _ => Err(crate::Error::from( diff --git a/lib/vector-core/src/event/proto.rs b/lib/vector-core/src/event/proto.rs index d767306343cb6..5359163e3f5a1 100644 --- a/lib/vector-core/src/event/proto.rs +++ b/lib/vector-core/src/event/proto.rs @@ -1,10 +1,11 @@ +use std::collections::BTreeMap; use std::sync::Arc; use chrono::TimeZone; use ordered_float::NotNan; -use super::{BTreeMap, MetricTags, WithMetadata}; -use crate::metrics::AgentDDSketch; +use super::{MetricTags, WithMetadata}; +use crate::{event, metrics::AgentDDSketch}; #[allow(warnings, clippy::all, clippy::pedantic)] mod proto_event { @@ -13,7 +14,7 @@ mod proto_event { pub use event_wrapper::Event; pub use metric::Value as MetricValue; pub use proto_event::*; -use vrl::value::Value as VrlValue; +use vrl::value::{ObjectMap, Value as VrlValue}; use super::{array, metric::MetricSketch, EventMetadata}; @@ -109,8 +110,8 @@ impl From for super::LogEvent { let fields = log .fields .into_iter() - .filter_map(|(k, v)| decode_value(v).map(|value| (k, value))) - .collect::>(); + .filter_map(|(k, v)| decode_value(v).map(|value| (k.into(), value))) + .collect::(); Self::from_map(fields, metadata) } @@ -134,8 +135,8 @@ impl From for super::TraceEvent { let fields = trace .fields .into_iter() - .filter_map(|(k, v)| decode_value(v).map(|value| (k, value))) - .collect::>(); + .filter_map(|(k, v)| decode_value(v).map(|value| (k.into(), value))) + .collect::(); Self::from(super::LogEvent::from_map(fields, metadata)) } @@ -302,7 +303,7 @@ impl From for WithMetadata { // using only "fields" to prevent having to use the dummy value let fields = fields .into_iter() - .map(|(k, v)| (k, encode_value(v))) + .map(|(k, v)| (k.into(), encode_value(v))) .collect::>(); (fields, None) @@ -333,7 +334,7 @@ impl From for WithMetadata { let (fields, metadata) = trace.into_parts(); let fields = fields .into_iter() - .map(|(k, v)| (k, encode_value(v))) + .map(|(k, v)| (k.into(), encode_value(v))) .collect::>(); #[allow(deprecated)] @@ -700,9 +701,9 @@ fn decode_value(input: Value) -> Option { fn decode_map(fields: BTreeMap) -> Option { fields .into_iter() - .map(|(key, value)| decode_value(value).map(|value| (key, value))) - .collect::>>() - .map(super::Value::Object) + .map(|(key, value)| decode_value(value).map(|value| (key.into(), value))) + .collect::>() + .map(event::Value::Object) } fn decode_array(items: Vec) -> Option { @@ -732,11 +733,11 @@ fn encode_value(value: super::Value) -> Value { } } -fn encode_map(fields: BTreeMap) -> ValueMap { +fn encode_map(fields: ObjectMap) -> ValueMap { ValueMap { fields: fields .into_iter() - .map(|(key, value)| (key, encode_value(value))) + .map(|(key, value)| (key.into(), encode_value(value))) .collect(), } } diff --git a/lib/vector-core/src/event/test/common.rs b/lib/vector-core/src/event/test/common.rs index 0f7d00908e152..d258e1fa3b7ff 100644 --- a/lib/vector-core/src/event/test/common.rs +++ b/lib/vector-core/src/event/test/common.rs @@ -1,10 +1,8 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - iter, -}; +use std::{collections::BTreeSet, iter}; use chrono::{DateTime, NaiveDateTime, Utc}; use quickcheck::{empty_shrinker, Arbitrary, Gen}; +use vrl::value::{ObjectMap, Value}; use super::super::{ metric::{ @@ -12,7 +10,6 @@ use super::super::{ Quantile, Sample, }, Event, EventMetadata, LogEvent, Metric, MetricKind, MetricValue, StatisticKind, TraceEvent, - Value, }; use crate::metrics::AgentDDSketch; @@ -82,7 +79,7 @@ impl Arbitrary for Event { impl Arbitrary for LogEvent { fn arbitrary(g: &mut Gen) -> Self { let mut gen = Gen::new(MAX_MAP_SIZE); - let map: BTreeMap = BTreeMap::arbitrary(&mut gen); + let map: ObjectMap = ObjectMap::arbitrary(&mut gen); let metadata: EventMetadata = EventMetadata::arbitrary(g); LogEvent::from_map(map, metadata) } diff --git a/lib/vector-core/src/event/test/mod.rs b/lib/vector-core/src/event/test/mod.rs index d1dda8523f6ab..c1030d64e3aca 100644 --- a/lib/vector-core/src/event/test/mod.rs +++ b/lib/vector-core/src/event/test/mod.rs @@ -43,9 +43,9 @@ fn event_iteration_order() { assert_eq!( collected, vec![ - (String::from("YRjhxXcg"), &Value::from("nw8iM5Jr")), - (String::from("lZDfzKIL"), &Value::from("tOVrjveM")), - (String::from("o9amkaRY"), &Value::from("pGsfG7Nr")), + ("YRjhxXcg".into(), &Value::from("nw8iM5Jr")), + ("lZDfzKIL".into(), &Value::from("tOVrjveM")), + ("o9amkaRY".into(), &Value::from("pGsfG7Nr")), ] ); } diff --git a/lib/vector-core/src/event/trace.rs b/lib/vector-core/src/event/trace.rs index 851c504592d86..d52295822e323 100644 --- a/lib/vector-core/src/event/trace.rs +++ b/lib/vector-core/src/event/trace.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, fmt::Debug}; +use std::fmt::Debug; use lookup::lookup_v2::TargetPath; use serde::{Deserialize, Serialize}; @@ -11,7 +11,7 @@ use vrl::path::PathParseError; use super::{ BatchNotifier, EstimatedJsonEncodedSizeOf, EventFinalizer, EventFinalizers, EventMetadata, - Finalizable, LogEvent, Value, + Finalizable, LogEvent, ObjectMap, Value, }; /// Traces are a newtype of `LogEvent` @@ -23,13 +23,13 @@ impl TraceEvent { /// # Panics /// /// Panics if the fields of the `TraceEvent` are not a `Value::Map`. - pub fn into_parts(self) -> (BTreeMap, EventMetadata) { + pub fn into_parts(self) -> (ObjectMap, EventMetadata) { let (value, metadata) = self.0.into_parts(); let map = value.into_object().expect("inner value must be a map"); (map, metadata) } - pub fn from_parts(fields: BTreeMap, metadata: EventMetadata) -> Self { + pub fn from_parts(fields: ObjectMap, metadata: EventMetadata) -> Self { Self(LogEvent::from_map(fields, metadata)) } @@ -63,11 +63,11 @@ impl TraceEvent { Self(self.0.with_batch_notifier_option(batch)) } - /// Convert a `TraceEvent` into a `BTreeMap` of it's fields + /// Convert a `TraceEvent` into an `ObjectMap` of it's fields /// # Panics /// /// Panics if the fields of the `TraceEvent` are not a `Value::Map`. - pub fn as_map(&self) -> &BTreeMap { + pub fn as_map(&self) -> &ObjectMap { self.0.as_map().expect("inner value must be a map") } @@ -120,8 +120,8 @@ impl From for TraceEvent { } } -impl From> for TraceEvent { - fn from(map: BTreeMap) -> Self { +impl From for TraceEvent { + fn from(map: ObjectMap) -> Self { Self(map.into()) } } diff --git a/lib/vector-core/src/event/util/log/all_fields.rs b/lib/vector-core/src/event/util/log/all_fields.rs index 9cc9cc2043b31..94efa67178a2b 100644 --- a/lib/vector-core/src/event/util/log/all_fields.rs +++ b/lib/vector-core/src/event/util/log/all_fields.rs @@ -1,23 +1,19 @@ -use std::{ - collections::{btree_map, BTreeMap}, - fmt::Write as _, - iter, slice, -}; +use std::{collections::btree_map, fmt::Write as _, iter, slice}; use serde::{Serialize, Serializer}; use vrl::path::PathPrefix; -use super::Value; +use crate::event::{KeyString, ObjectMap, Value}; /// Iterates over all paths in form `a.b[0].c[1]` in alphabetical order /// and their corresponding values. -pub fn all_fields(fields: &BTreeMap) -> FieldsIter { +pub fn all_fields(fields: &ObjectMap) -> FieldsIter { FieldsIter::new(fields) } /// Same functionality as `all_fields` but it prepends a character that denotes the /// path type. -pub fn all_metadata_fields(fields: &BTreeMap) -> FieldsIter { +pub fn all_metadata_fields(fields: &ObjectMap) -> FieldsIter { FieldsIter::new_with_prefix(PathPrefix::Metadata, fields) } @@ -29,13 +25,13 @@ pub fn all_fields_non_object_root(value: &Value) -> FieldsIter { #[derive(Clone, Debug)] enum LeafIter<'a> { Root((&'a Value, bool)), - Map(btree_map::Iter<'a, String, Value>), + Map(btree_map::Iter<'a, KeyString, Value>), Array(iter::Enumerate>), } #[derive(Clone, Copy)] enum PathComponent<'a> { - Key(&'a String), + Key(&'a KeyString), Index(usize), } @@ -54,7 +50,7 @@ pub struct FieldsIter<'a> { impl<'a> FieldsIter<'a> { // TODO deprecate this in favor of `new_with_prefix`. - fn new(fields: &'a BTreeMap) -> FieldsIter<'a> { + fn new(fields: &'a ObjectMap) -> FieldsIter<'a> { FieldsIter { path_prefix: None, stack: vec![LeafIter::Map(fields.iter())], @@ -62,10 +58,7 @@ impl<'a> FieldsIter<'a> { } } - fn new_with_prefix( - path_prefix: PathPrefix, - fields: &'a BTreeMap, - ) -> FieldsIter<'a> { + fn new_with_prefix(path_prefix: PathPrefix, fields: &'a ObjectMap) -> FieldsIter<'a> { FieldsIter { path_prefix: Some(path_prefix), stack: vec![LeafIter::Map(fields.iter())], @@ -104,7 +97,7 @@ impl<'a> FieldsIter<'a> { self.path.pop(); } - fn make_path(&mut self, component: PathComponent<'a>) -> String { + fn make_path(&mut self, component: PathComponent<'a>) -> KeyString { let mut res = match self.path_prefix { None => String::new(), Some(prefix) => match prefix { @@ -115,7 +108,7 @@ impl<'a> FieldsIter<'a> { let mut path_iter = self.path.iter().chain(iter::once(&component)).peekable(); loop { match path_iter.next() { - None => return res, + None => break res.into(), Some(PathComponent::Key(key)) => { if key.contains('.') { res.push_str(&key.replace('.', "\\.")); @@ -135,7 +128,7 @@ impl<'a> FieldsIter<'a> { } impl<'a> Iterator for FieldsIter<'a> { - type Item = (String, &'a Value); + type Item = (KeyString, &'a Value); fn next(&mut self) -> Option { loop { @@ -161,9 +154,9 @@ impl<'a> Iterator for FieldsIter<'a> { } }, Some(LeafIter::Root((value, visited))) => { - let result = (!*visited).then(|| ("message".to_owned(), *value)); + let result = (!*visited).then(|| ("message".into(), *value)); *visited = true; - return result; + break result; } }; } @@ -250,7 +243,7 @@ mod test { ("a.array[3][0]", Value::Integer(2)), ("a.b.c", Value::Integer(5)), ("a\\.b\\.c", Value::Integer(6)), - ("d", Value::Object(BTreeMap::new())), + ("d", Value::Object(ObjectMap::new())), ("e", Value::Array(Vec::new())), ] .into_iter() @@ -265,7 +258,7 @@ mod test { fn test_non_object_root() { let value = Value::Integer(3); let collected: Vec<_> = all_fields_non_object_root(&value) - .map(|(k, v)| (k, v.clone())) + .map(|(k, v)| (k.into(), v.clone())) .collect(); assert_eq!(collected, vec![("message".to_owned(), value)]); } diff --git a/lib/vector-core/src/event/util/log/keys.rs b/lib/vector-core/src/event/util/log/keys.rs index 9139d6a3c3b59..9a5a41c36eaac 100644 --- a/lib/vector-core/src/event/util/log/keys.rs +++ b/lib/vector-core/src/event/util/log/keys.rs @@ -1,11 +1,10 @@ -use std::collections::BTreeMap; - -use super::{all_fields, Value}; +use super::all_fields; +use crate::event::{KeyString, ObjectMap}; /// Iterates over all paths in form `a.b[0].c[1]` in alphabetical order. /// It is implemented as a wrapper around `all_fields` to reduce code /// duplication. -pub fn keys(fields: &BTreeMap) -> impl Iterator + '_ { +pub fn keys(fields: &ObjectMap) -> impl Iterator + '_ { all_fields(fields).map(|(k, _)| k) } @@ -24,7 +23,7 @@ mod test { })); let expected: Vec<_> = vec!["field1", "field2", "field3"] .into_iter() - .map(String::from) + .map(KeyString::from) .collect(); let collected: Vec<_> = keys(&fields).collect(); @@ -53,7 +52,7 @@ mod test { "a.b.c", ] .into_iter() - .map(String::from) + .map(KeyString::from) .collect(); let collected: Vec<_> = keys(&fields).collect(); diff --git a/lib/vector-core/src/event/util/log/mod.rs b/lib/vector-core/src/event/util/log/mod.rs index dc619db32c09d..b331397bcc774 100644 --- a/lib/vector-core/src/event/util/log/mod.rs +++ b/lib/vector-core/src/event/util/log/mod.rs @@ -4,17 +4,13 @@ mod keys; pub use all_fields::{all_fields, all_fields_non_object_root, all_metadata_fields}; pub use keys::keys; -use super::Value; - #[cfg(test)] mod test { - use std::collections::BTreeMap; - use serde_json::Value as JsonValue; - use super::Value; + use crate::event::{ObjectMap, Value}; - pub(crate) fn fields_from_json(json_value: JsonValue) -> BTreeMap { + pub(crate) fn fields_from_json(json_value: JsonValue) -> ObjectMap { match Value::from(json_value) { Value::Object(map) => map, something => panic!("Expected a map, got {something:?}"), diff --git a/lib/vector-core/src/event/util/mod.rs b/lib/vector-core/src/event/util/mod.rs index 952cbfe714256..f4ee9bc8d0d45 100644 --- a/lib/vector-core/src/event/util/mod.rs +++ b/lib/vector-core/src/event/util/mod.rs @@ -1,3 +1 @@ pub mod log; - -use super::Value; diff --git a/lib/vector-core/src/event/vrl_target.rs b/lib/vector-core/src/event/vrl_target.rs index fd94f307fd245..a528230abb146 100644 --- a/lib/vector-core/src/event/vrl_target.rs +++ b/lib/vector-core/src/event/vrl_target.rs @@ -7,7 +7,7 @@ use snafu::Snafu; use vrl::compiler::value::VrlValueConvert; use vrl::compiler::{ProgramInfo, SecretTarget, Target}; use vrl::prelude::Collection; -use vrl::value::{Kind, Value}; +use vrl::value::{Kind, ObjectMap, Value}; use super::{metric::TagValue, Event, EventMetadata, LogEvent, Metric, MetricKind, TraceEvent}; use crate::config::{log_schema, LogNamespace}; @@ -293,7 +293,7 @@ impl Target for VrlTarget { metric.remove_tags(); for (field, value) in &value { set_metric_tag_values( - field.as_str().to_owned(), + field[..].into(), value, metric, *multi_value_tags, @@ -538,7 +538,7 @@ fn target_get_mut_metric<'a>( /// This structure is partially populated based on the fields accessed by /// the VRL program as informed by `ProgramInfo`. fn precompute_metric_value(metric: &Metric, info: &ProgramInfo) -> Value { - let mut map = BTreeMap::default(); + let mut map = ObjectMap::default(); let mut set_name = false; let mut set_kind = false; @@ -551,36 +551,36 @@ fn precompute_metric_value(metric: &Metric, info: &ProgramInfo) -> Value { // Accessing a root path requires us to pre-populate all fields. if target_path == &OwnedTargetPath::event_root() { if !set_name { - map.insert("name".to_owned(), metric.name().to_owned().into()); + map.insert("name".into(), metric.name().to_owned().into()); } if !set_kind { - map.insert("kind".to_owned(), metric.kind().into()); + map.insert("kind".into(), metric.kind().into()); } if !set_type { - map.insert("type".to_owned(), metric.value().clone().into()); + map.insert("type".into(), metric.value().clone().into()); } if !set_namespace { if let Some(namespace) = metric.namespace() { - map.insert("namespace".to_owned(), namespace.to_owned().into()); + map.insert("namespace".into(), namespace.to_owned().into()); } } if !set_timestamp { if let Some(timestamp) = metric.timestamp() { - map.insert("timestamp".to_owned(), timestamp.into()); + map.insert("timestamp".into(), timestamp.into()); } } if !set_tags { if let Some(tags) = metric.tags().cloned() { map.insert( - "tags".to_owned(), + "tags".into(), tags.into_iter_single() - .map(|(tag, value)| (tag, value.into())) - .collect::>() + .map(|(tag, value)| (tag.into(), value.into())) + .collect::() .into(), ); } @@ -595,38 +595,38 @@ fn precompute_metric_value(metric: &Metric, info: &ProgramInfo) -> Value { match field.as_ref() { "name" if !set_name => { set_name = true; - map.insert("name".to_owned(), metric.name().to_owned().into()); + map.insert("name".into(), metric.name().to_owned().into()); } "kind" if !set_kind => { set_kind = true; - map.insert("kind".to_owned(), metric.kind().into()); + map.insert("kind".into(), metric.kind().into()); } "type" if !set_type => { set_type = true; - map.insert("type".to_owned(), metric.value().clone().into()); + map.insert("type".into(), metric.value().clone().into()); } "namespace" if !set_namespace && metric.namespace().is_some() => { set_namespace = true; map.insert( - "namespace".to_owned(), + "namespace".into(), metric.namespace().unwrap().to_owned().into(), ); } "timestamp" if !set_timestamp && metric.timestamp().is_some() => { set_timestamp = true; - map.insert("timestamp".to_owned(), metric.timestamp().unwrap().into()); + map.insert("timestamp".into(), metric.timestamp().unwrap().into()); } "tags" if !set_tags && metric.tags().is_some() => { set_tags = true; map.insert( - "tags".to_owned(), + "tags".into(), metric .tags() .cloned() .unwrap() .into_iter_single() - .map(|(tag, value)| (tag, value.into())) - .collect::>() + .map(|(tag, value)| (tag.into(), value.into())) + .collect::() .into(), ); } @@ -790,7 +790,7 @@ mod test { ]; for (value, path, expect) in cases { - let value: BTreeMap = value; + let value: ObjectMap = value; let info = ProgramInfo { fallible: false, abortable: false, @@ -894,7 +894,7 @@ mod test { ]; for (object, path, value, expect, result) in cases { - let object: BTreeMap = object; + let object: ObjectMap = object; let info = ProgramInfo { fallible: false, abortable: false, diff --git a/lib/vector-lookup/src/lookup_v2/mod.rs b/lib/vector-lookup/src/lookup_v2/mod.rs index de95977844d62..b92acd7916c0c 100644 --- a/lib/vector-lookup/src/lookup_v2/mod.rs +++ b/lib/vector-lookup/src/lookup_v2/mod.rs @@ -7,6 +7,7 @@ pub use vrl::path::{ parse_target_path, parse_value_path, BorrowedSegment, OwnedSegment, OwnedTargetPath, OwnedValuePath, PathConcat, PathParseError, PathPrefix, TargetPath, ValuePath, }; +use vrl::value::KeyString; /// A wrapper around `OwnedValuePath` that allows it to be used in Vector config. /// This requires a valid path to be used. If you want to allow optional paths, @@ -24,6 +25,14 @@ impl TryFrom for ConfigValuePath { } } +impl TryFrom for ConfigValuePath { + type Error = PathParseError; + + fn try_from(src: KeyString) -> Result { + OwnedValuePath::try_from(String::from(src)).map(ConfigValuePath) + } +} + impl From for String { fn from(owned: ConfigValuePath) -> Self { String::from(owned.0) @@ -60,6 +69,14 @@ impl TryFrom for ConfigTargetPath { } } +impl TryFrom for ConfigTargetPath { + type Error = PathParseError; + + fn try_from(src: KeyString) -> Result { + OwnedTargetPath::try_from(src).map(ConfigTargetPath) + } +} + impl From for String { fn from(owned: ConfigTargetPath) -> Self { String::from(owned.0) diff --git a/lib/vector-vrl/tests/Cargo.toml b/lib/vector-vrl/tests/Cargo.toml index 65e8459cdd7fe..72aeae3861191 100644 --- a/lib/vector-vrl/tests/Cargo.toml +++ b/lib/vector-vrl/tests/Cargo.toml @@ -7,7 +7,7 @@ publish = false [dependencies] enrichment = { path = "../../enrichment" } -vrl = { version = "0.8.1", features = ["test", "test_framework"] } +vrl.workspace = true vector-vrl-functions = { path = "../../vector-vrl/functions" } ansi_term = "0.12" diff --git a/lib/vector-vrl/tests/src/test_enrichment.rs b/lib/vector-vrl/tests/src/test_enrichment.rs index 71ee66d87ff20..725c4cd4a2ab6 100644 --- a/lib/vector-vrl/tests/src/test_enrichment.rs +++ b/lib/vector-vrl/tests/src/test_enrichment.rs @@ -1,5 +1,5 @@ -use std::collections::{BTreeMap, HashMap}; -use vrl::value::Value; +use std::collections::HashMap; +use vrl::value::{ObjectMap, Value}; #[derive(Debug, Clone)] struct TestEnrichmentTable; @@ -11,11 +11,11 @@ impl enrichment::Table for TestEnrichmentTable { _condition: &'a [enrichment::Condition<'a>], _select: Option<&[String]>, _index: Option, - ) -> Result, String> { - let mut result = BTreeMap::new(); - result.insert("id".to_string(), Value::from(1)); - result.insert("firstname".to_string(), Value::from("Bob")); - result.insert("surname".to_string(), Value::from("Smith")); + ) -> Result { + let mut result = ObjectMap::new(); + result.insert("id".into(), Value::from(1)); + result.insert("firstname".into(), Value::from("Bob")); + result.insert("surname".into(), Value::from("Smith")); Ok(result) } @@ -26,16 +26,16 @@ impl enrichment::Table for TestEnrichmentTable { _condition: &'a [enrichment::Condition<'a>], _select: Option<&[String]>, _index: Option, - ) -> Result>, String> { - let mut result1 = BTreeMap::new(); - result1.insert("id".to_string(), Value::from(1)); - result1.insert("firstname".to_string(), Value::from("Bob")); - result1.insert("surname".to_string(), Value::from("Smith")); + ) -> Result, String> { + let mut result1 = ObjectMap::new(); + result1.insert("id".into(), Value::from(1)); + result1.insert("firstname".into(), Value::from("Bob")); + result1.insert("surname".into(), Value::from("Smith")); - let mut result2 = BTreeMap::new(); - result2.insert("id".to_string(), Value::from(2)); - result2.insert("firstname".to_string(), Value::from("Fred")); - result2.insert("surname".to_string(), Value::from("Smith")); + let mut result2 = ObjectMap::new(); + result2.insert("id".into(), Value::from(2)); + result2.insert("firstname".into(), Value::from("Fred")); + result2.insert("surname".into(), Value::from("Smith")); Ok(vec![result1, result2]) } @@ -61,7 +61,7 @@ impl enrichment::Table for TestEnrichmentTable { pub(crate) fn test_enrichment_table() -> enrichment::TableRegistry { let registry = enrichment::TableRegistry::default(); let mut tables: HashMap> = HashMap::new(); - tables.insert("test".to_string(), Box::new(TestEnrichmentTable)); + tables.insert("test".into(), Box::new(TestEnrichmentTable)); registry.load(tables); registry diff --git a/lib/vector-vrl/web-playground/build.rs b/lib/vector-vrl/web-playground/build.rs index c12e6767bfdbd..f6d42756f1057 100644 --- a/lib/vector-vrl/web-playground/build.rs +++ b/lib/vector-vrl/web-playground/build.rs @@ -53,7 +53,7 @@ fn write_build_constants(manifest: &Manifest, dest_path: &Path) -> io::Result<() .unwrap() .version .clone() - .unwrap(); + .unwrap_or_else(|| "FIXME".into()); let vrl_version_const = create_const_statement("VRL_VERSION", vrl_version); output_file .write_all(vrl_version_const.as_bytes()) diff --git a/src/api/schema/events/metric.rs b/src/api/schema/events/metric.rs index 31786b579bfc5..77247ac9b365f 100644 --- a/src/api/schema/events/metric.rs +++ b/src/api/schema/events/metric.rs @@ -1,5 +1,3 @@ -use std::collections::BTreeMap; - use async_graphql::{Enum, Object}; use chrono::{DateTime, Utc}; use serde_json::Value; @@ -7,7 +5,7 @@ use vector_lib::encode_logfmt; use super::EventEncodingType; use crate::{ - event::{self}, + event::{self, KeyString}, topology::TapOutput, }; @@ -130,7 +128,7 @@ impl Metric { .expect("logfmt serialization of metric event failed: conversion to serde Value failed. Please report."); match json { Value::Object(map) => encode_logfmt::encode_map( - &map.into_iter().collect::>(), + &map.into_iter().map(|(k,v)| (KeyString::from(k), v)).collect(), ) .expect("logfmt serialization of metric event failed. Please report."), _ => panic!("logfmt serialization of metric event failed: metric converted to unexpected serde Value. Please report."), diff --git a/src/codecs/encoding/transformer.rs b/src/codecs/encoding/transformer.rs index 237f6aa8ae1b6..ed679d9fd728d 100644 --- a/src/codecs/encoding/transformer.rs +++ b/src/codecs/encoding/transformer.rs @@ -150,7 +150,7 @@ impl Transformer { let mut new_log = LogEvent::from(old_value); if let Some(service) = new_log.remove(service_path) { log.metadata_mut() - .add_dropped_field(meaning::SERVICE.to_string(), service); + .add_dropped_field(meaning::SERVICE.into(), service); } } } @@ -171,7 +171,7 @@ impl Transformer { if let (Some(v), Some(service_path)) = (value, service_path) { if service_path.path == *value_path { log.metadata_mut() - .add_dropped_field(meaning::SERVICE.to_string(), v); + .add_dropped_field(meaning::SERVICE.into(), v); } } } diff --git a/src/enrichment_tables/file.rs b/src/enrichment_tables/file.rs index c63993084897f..5f1e63cce1214 100644 --- a/src/enrichment_tables/file.rs +++ b/src/enrichment_tables/file.rs @@ -1,18 +1,12 @@ //! Handles enrichment tables for `type = file`. -use std::{ - collections::{BTreeMap, HashMap}, - fs, - hash::Hasher, - path::PathBuf, - time::SystemTime, -}; +use std::{collections::HashMap, fs, hash::Hasher, path::PathBuf, time::SystemTime}; use bytes::Bytes; use tracing::trace; use vector_lib::configurable::configurable_component; use vector_lib::enrichment::{Case, Condition, IndexHandle, Table}; use vector_lib::{conversion::Conversion, TimeZone}; -use vrl::value::Value; +use vrl::value::{ObjectMap, Value}; use crate::config::EnrichmentTableConfig; @@ -311,7 +305,7 @@ impl File { }) } - fn add_columns(&self, select: Option<&[String]>, row: &[Value]) -> BTreeMap { + fn add_columns(&self, select: Option<&[String]>, row: &[Value]) -> ObjectMap { self.headers .iter() .zip(row) @@ -321,7 +315,7 @@ impl File { // If no select is passed, we assume all columns are included .unwrap_or(true) }) - .map(|(header, col)| (header.clone(), col.clone())) + .map(|(header, col)| (header.as_str().into(), col.clone())) .collect() } @@ -399,7 +393,7 @@ impl File { case: Case, condition: &'a [Condition<'a>], select: Option<&'a [String]>, - ) -> impl Iterator> + 'a + ) -> impl Iterator + 'a where I: Iterator> + 'a, { @@ -484,7 +478,7 @@ impl Table for File { condition: &'a [Condition<'a>], select: Option<&'a [String]>, index: Option, - ) -> Result, String> { + ) -> Result { match index { None => { // No index has been passed so we need to do a Sequential Scan. @@ -509,7 +503,7 @@ impl Table for File { condition: &'a [Condition<'a>], select: Option<&'a [String]>, index: Option, - ) -> Result>, String> { + ) -> Result, String> { match index { None => { // No index has been passed so we need to do a Sequential Scan. @@ -717,9 +711,9 @@ mod tests { }; assert_eq!( - Ok(BTreeMap::from([ - (String::from("field1"), Value::from("zirp")), - (String::from("field2"), Value::from("zurp")), + Ok(ObjectMap::from([ + ("field1".into(), Value::from("zirp")), + ("field2".into(), Value::from("zurp")), ])), file.find_table_row(Case::Sensitive, &[condition], None, None) ); @@ -785,9 +779,9 @@ mod tests { }; assert_eq!( - Ok(BTreeMap::from([ - (String::from("field1"), Value::from("zirp")), - (String::from("field2"), Value::from("zurp")), + Ok(ObjectMap::from([ + ("field1".into(), Value::from("zirp")), + ("field2".into(), Value::from("zurp")), ])), file.find_table_row(Case::Sensitive, &[condition], None, Some(handle)) ); @@ -810,13 +804,13 @@ mod tests { assert_eq!( Ok(vec![ - BTreeMap::from([ - (String::from("field1"), Value::from("zip")), - (String::from("field2"), Value::from("zup")), + ObjectMap::from([ + ("field1".into(), Value::from("zip")), + ("field2".into(), Value::from("zup")), ]), - BTreeMap::from([ - (String::from("field1"), Value::from("zip")), - (String::from("field2"), Value::from("zoop")), + ObjectMap::from([ + ("field1".into(), Value::from("zip")), + ("field2".into(), Value::from("zoop")), ]), ]), file.find_table_rows( @@ -870,13 +864,13 @@ mod tests { assert_eq!( Ok(vec![ - BTreeMap::from([ - (String::from("field1"), Value::from("zip")), - (String::from("field3"), Value::from("zoop")), + ObjectMap::from([ + ("field1".into(), Value::from("zip")), + ("field3".into(), Value::from("zoop")), ]), - BTreeMap::from([ - (String::from("field1"), Value::from("zip")), - (String::from("field3"), Value::from("zibble")), + ObjectMap::from([ + ("field1".into(), Value::from("zip")), + ("field3".into(), Value::from("zibble")), ]), ]), file.find_table_rows( @@ -905,13 +899,13 @@ mod tests { assert_eq!( Ok(vec![ - BTreeMap::from([ - (String::from("field1"), Value::from("zip")), - (String::from("field2"), Value::from("zup")), + ObjectMap::from([ + ("field1".into(), Value::from("zip")), + ("field2".into(), Value::from("zup")), ]), - BTreeMap::from([ - (String::from("field1"), Value::from("zip")), - (String::from("field2"), Value::from("zoop")), + ObjectMap::from([ + ("field1".into(), Value::from("zip")), + ("field2".into(), Value::from("zoop")), ]), ]), file.find_table_rows( @@ -927,13 +921,13 @@ mod tests { assert_eq!( Ok(vec![ - BTreeMap::from([ - (String::from("field1"), Value::from("zip")), - (String::from("field2"), Value::from("zup")), + ObjectMap::from([ + ("field1".into(), Value::from("zip")), + ("field2".into(), Value::from("zup")), ]), - BTreeMap::from([ - (String::from("field1"), Value::from("zip")), - (String::from("field2"), Value::from("zoop")), + ObjectMap::from([ + ("field1".into(), Value::from("zip")), + ("field2".into(), Value::from("zoop")), ]), ]), file.find_table_rows( @@ -997,10 +991,10 @@ mod tests { ]; assert_eq!( - Ok(BTreeMap::from([ - (String::from("field1"), Value::from("zip")), + Ok(ObjectMap::from([ + ("field1".into(), Value::from("zip")), ( - String::from("field2"), + "field2".into(), Value::Timestamp( chrono::Utc .with_ymd_and_hms(2016, 12, 7, 0, 0, 0) diff --git a/src/enrichment_tables/geoip.rs b/src/enrichment_tables/geoip.rs index 1443c4252f72c..c9b5284665de8 100644 --- a/src/enrichment_tables/geoip.rs +++ b/src/enrichment_tables/geoip.rs @@ -11,10 +11,9 @@ use maxminddb::{ MaxMindDBError, Reader, }; use ordered_float::NotNan; -use vrl::value::Value; - use vector_lib::configurable::configurable_component; use vector_lib::enrichment::{Case, Condition, IndexHandle, Table}; +use vrl::value::{ObjectMap, Value}; use crate::config::{EnrichmentTableConfig, GenerateConfig}; @@ -134,14 +133,14 @@ impl Geoip { } } - fn lookup(&self, ip: IpAddr, select: Option<&[String]>) -> Option> { - let mut map = BTreeMap::new(); + fn lookup(&self, ip: IpAddr, select: Option<&[String]>) -> Option { + let mut map = ObjectMap::new(); let mut add_field = |key: &str, value: Option| { if select .map(|fields| fields.iter().any(|field| field == key)) .unwrap_or(true) { - map.insert(key.to_string(), value.unwrap_or(Value::Null)); + map.insert(key.into(), value.unwrap_or(Value::Null)); } }; @@ -245,7 +244,7 @@ impl Table for Geoip { condition: &'a [Condition<'a>], select: Option<&[String]>, index: Option, - ) -> Result, String> { + ) -> Result { let mut rows = self.find_table_rows(case, condition, select, index)?; match rows.pop() { @@ -264,7 +263,7 @@ impl Table for Geoip { condition: &'a [Condition<'a>], select: Option<&[String]>, _: Option, - ) -> Result>, String> { + ) -> Result, String> { match condition.get(0) { Some(_) if condition.len() > 1 => Err("Only one condition is allowed".to_string()), Some(Condition::Equals { value, .. }) => { @@ -326,18 +325,18 @@ mod tests { fn city_lookup() { let values = find("2.125.160.216", "tests/data/GeoIP2-City-Test.mmdb").unwrap(); - let mut expected = BTreeMap::::new(); - expected.insert("city_name".to_string(), "Boxford".into()); - expected.insert("country_code".to_string(), "GB".into()); - expected.insert("continent_code".to_string(), "EU".into()); - expected.insert("country_name".to_string(), "United Kingdom".into()); - expected.insert("region_code".to_string(), "WBK".into()); - expected.insert("region_name".to_string(), "West Berkshire".into()); - expected.insert("timezone".to_string(), "Europe/London".into()); - expected.insert("latitude".to_string(), Value::from(51.75)); - expected.insert("longitude".to_string(), Value::from(-1.25)); - expected.insert("postal_code".to_string(), "OX1".into()); - expected.insert("metro_code".to_string(), Value::Null); + let mut expected = ObjectMap::new(); + expected.insert("city_name".into(), "Boxford".into()); + expected.insert("country_code".into(), "GB".into()); + expected.insert("continent_code".into(), "EU".into()); + expected.insert("country_name".into(), "United Kingdom".into()); + expected.insert("region_code".into(), "WBK".into()); + expected.insert("region_name".into(), "West Berkshire".into()); + expected.insert("timezone".into(), "Europe/London".into()); + expected.insert("latitude".into(), Value::from(51.75)); + expected.insert("longitude".into(), Value::from(-1.25)); + expected.insert("postal_code".into(), "OX1".into()); + expected.insert("metro_code".into(), Value::Null); assert_eq!(values, expected); } @@ -351,9 +350,9 @@ mod tests { ) .unwrap(); - let mut expected = BTreeMap::::new(); - expected.insert("latitude".to_string(), Value::from(51.75)); - expected.insert("longitude".to_string(), Value::from(-1.25)); + let mut expected = ObjectMap::new(); + expected.insert("latitude".into(), Value::from(51.75)); + expected.insert("longitude".into(), Value::from(-1.25)); assert_eq!(values, expected); } @@ -362,18 +361,18 @@ mod tests { fn city_lookup_partial_results() { let values = find("67.43.156.9", "tests/data/GeoIP2-City-Test.mmdb").unwrap(); - let mut expected = BTreeMap::::new(); - expected.insert("city_name".to_string(), Value::Null); - expected.insert("country_code".to_string(), "BT".into()); - expected.insert("country_name".to_string(), "Bhutan".into()); - expected.insert("continent_code".to_string(), "AS".into()); - expected.insert("region_code".to_string(), Value::Null); - expected.insert("region_name".to_string(), Value::Null); - expected.insert("timezone".to_string(), "Asia/Thimphu".into()); - expected.insert("latitude".to_string(), Value::from(27.5)); - expected.insert("longitude".to_string(), Value::from(90.5)); - expected.insert("postal_code".to_string(), Value::Null); - expected.insert("metro_code".to_string(), Value::Null); + let mut expected = ObjectMap::new(); + expected.insert("city_name".into(), Value::Null); + expected.insert("country_code".into(), "BT".into()); + expected.insert("country_name".into(), "Bhutan".into()); + expected.insert("continent_code".into(), "AS".into()); + expected.insert("region_code".into(), Value::Null); + expected.insert("region_name".into(), Value::Null); + expected.insert("timezone".into(), "Asia/Thimphu".into()); + expected.insert("latitude".into(), Value::from(27.5)); + expected.insert("longitude".into(), Value::from(90.5)); + expected.insert("postal_code".into(), Value::Null); + expected.insert("metro_code".into(), Value::Null); assert_eq!(values, expected); } @@ -389,14 +388,14 @@ mod tests { fn isp_lookup() { let values = find("208.192.1.2", "tests/data/GeoIP2-ISP-Test.mmdb").unwrap(); - let mut expected = BTreeMap::::new(); - expected.insert("autonomous_system_number".to_string(), 701i64.into()); + let mut expected = ObjectMap::new(); + expected.insert("autonomous_system_number".into(), 701i64.into()); expected.insert( - "autonomous_system_organization".to_string(), + "autonomous_system_organization".into(), "MCI Communications Services, Inc. d/b/a Verizon Business".into(), ); - expected.insert("isp".to_string(), "Verizon Business".into()); - expected.insert("organization".to_string(), "Verizon Business".into()); + expected.insert("isp".into(), "Verizon Business".into()); + expected.insert("organization".into(), "Verizon Business".into()); assert_eq!(values, expected); } @@ -405,14 +404,14 @@ mod tests { fn isp_lookup_partial_results() { let values = find("2600:7000::1", "tests/data/GeoLite2-ASN-Test.mmdb").unwrap(); - let mut expected = BTreeMap::::new(); - expected.insert("autonomous_system_number".to_string(), 6939i64.into()); + let mut expected = ObjectMap::new(); + expected.insert("autonomous_system_number".into(), 6939i64.into()); expected.insert( - "autonomous_system_organization".to_string(), + "autonomous_system_organization".into(), "Hurricane Electric, Inc.".into(), ); - expected.insert("isp".to_string(), Value::Null); - expected.insert("organization".to_string(), Value::Null); + expected.insert("isp".into(), Value::Null); + expected.insert("organization".into(), Value::Null); assert_eq!(values, expected); } @@ -432,8 +431,8 @@ mod tests { ) .unwrap(); - let mut expected = BTreeMap::::new(); - expected.insert("connection_type".to_string(), "Corporate".into()); + let mut expected = ObjectMap::new(); + expected.insert("connection_type".into(), "Corporate".into()); assert_eq!(values, expected); } @@ -445,15 +444,11 @@ mod tests { assert!(values.is_none()); } - fn find(ip: &str, database: &str) -> Option> { + fn find(ip: &str, database: &str) -> Option { find_select(ip, database, None) } - fn find_select( - ip: &str, - database: &str, - select: Option<&[String]>, - ) -> Option> { + fn find_select(ip: &str, database: &str, select: Option<&[String]>) -> Option { Geoip::new(GeoipConfig { path: database.to_string(), locale: default_locale(), diff --git a/src/sinks/datadog/traces/apm_stats/aggregation.rs b/src/sinks/datadog/traces/apm_stats/aggregation.rs index 7d990b6c803e4..e1897df613233 100644 --- a/src/sinks/datadog/traces/apm_stats/aggregation.rs +++ b/src/sinks/datadog/traces/apm_stats/aggregation.rs @@ -7,7 +7,7 @@ use super::{ bucket::Bucket, ClientStatsBucket, ClientStatsPayload, PartitionKey, BUCKET_DURATION_NANOSECONDS, }; -use crate::event::{TraceEvent, Value}; +use crate::event::{ObjectMap, TraceEvent, Value}; const MEASURED_KEY: &str = "_dd.measured"; const PARTIAL_VERSION_KEY: &str = "_dd.partial_version"; @@ -26,7 +26,7 @@ pub(crate) struct AggregationKey { impl AggregationKey { fn new_aggregation_from_span( - span: &BTreeMap, + span: &ObjectMap, payload_key: PayloadAggregationKey, synthetics: bool, ) -> Self { @@ -71,7 +71,7 @@ pub(crate) struct PayloadAggregationKey { } impl PayloadAggregationKey { - fn with_span_context(self, span: &BTreeMap) -> Self { + fn with_span_context(self, span: &ObjectMap) -> Self { PayloadAggregationKey { env: span .get("meta") @@ -221,7 +221,7 @@ impl Aggregator { /// The key is constructed from various span/trace properties (see `AggregationKey`). fn handle_span( &mut self, - span: &BTreeMap, + span: &ObjectMap, weight: f64, is_top: bool, synthetics: bool, @@ -387,7 +387,7 @@ const fn align_timestamp(start: u64) -> u64 { /// Assumes that all metrics are all encoded as Value::Float. /// Return the f64 of the specified key or None of key not present. -fn get_metric_value_float(span: &BTreeMap, key: &str) -> Option { +fn get_metric_value_float(span: &ObjectMap, key: &str) -> Option { span.get("metrics") .and_then(|m| m.as_object()) .map(|m| match m.get(key) { @@ -399,7 +399,7 @@ fn get_metric_value_float(span: &BTreeMap, key: &str) -> Option, key: &str) -> bool { +fn metric_value_is_1(span: &ObjectMap, key: &str) -> bool { match get_metric_value_float(span, key) { Some(f) => f == 1.0, None => false, @@ -407,14 +407,14 @@ fn metric_value_is_1(span: &BTreeMap, key: &str) -> bool { } /// Returns true if span is top-level. -fn has_top_level(span: &BTreeMap) -> bool { +fn has_top_level(span: &ObjectMap) -> bool { // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L28-L31 metric_value_is_1(span, TOP_LEVEL_KEY) } /// Returns true if a span should be measured (i.e. it should get trace metrics calculated). -fn is_measured(span: &BTreeMap) -> bool { +fn is_measured(span: &ObjectMap) -> bool { // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L40-L43 metric_value_is_1(span, MEASURED_KEY) @@ -424,7 +424,7 @@ fn is_measured(span: &BTreeMap) -> bool { /// These types of spans are partial images of long-running spans. /// When incomplete, a partial snapshot has a metric _dd.partial_version which is a positive integer. /// The metric usually increases each time a new version of the same span is sent by the tracer -fn is_partial_snapshot(span: &BTreeMap) -> bool { +fn is_partial_snapshot(span: &ObjectMap) -> bool { // Based on: https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/traceutil/span.go#L49-L52 match get_metric_value_float(span, PARTIAL_VERSION_KEY) { diff --git a/src/sinks/datadog/traces/apm_stats/bucket.rs b/src/sinks/datadog/traces/apm_stats/bucket.rs index a1d0a9f198508..7dd2334e7fe27 100644 --- a/src/sinks/datadog/traces/apm_stats/bucket.rs +++ b/src/sinks/datadog/traces/apm_stats/bucket.rs @@ -6,7 +6,7 @@ use super::{ aggregation::{AggregationKey, PayloadAggregationKey}, ddsketch_full, ClientGroupedStats, ClientStatsBucket, }; -use crate::{event::Value, metrics::AgentDDSketch}; +use crate::{event::ObjectMap, event::Value, metrics::AgentDDSketch}; pub(crate) struct GroupedStats { hits: f64, @@ -142,7 +142,7 @@ impl Bucket { pub(crate) fn add( &mut self, - span: &BTreeMap, + span: &ObjectMap, weight: f64, is_top: bool, aggkey: AggregationKey, @@ -159,7 +159,7 @@ impl Bucket { /// Update a bucket with a new span. Computed statistics include the number of hits and the actual distribution of /// execution time, with isolated measurements for spans flagged as errored and spans without error. - fn update(span: &BTreeMap, weight: f64, is_top: bool, gs: &mut GroupedStats) { + fn update(span: &ObjectMap, weight: f64, is_top: bool, gs: &mut GroupedStats) { is_top.then(|| { gs.top_level_hits += weight; }); diff --git a/src/sinks/datadog/traces/apm_stats/weight.rs b/src/sinks/datadog/traces/apm_stats/weight.rs index dfce6fbdb9f57..48d6195e06606 100644 --- a/src/sinks/datadog/traces/apm_stats/weight.rs +++ b/src/sinks/datadog/traces/apm_stats/weight.rs @@ -1,11 +1,11 @@ use std::collections::BTreeMap; -use crate::event::Value; +use crate::event::{ObjectMap, Value}; const SAMPLING_RATE_KEY: &str = "_sample_rate"; /// This extracts the relative weights from the top level span (i.e. the span that does not have a parent). -pub(crate) fn extract_weight_from_root_span(spans: &[&BTreeMap]) -> f64 { +pub(crate) fn extract_weight_from_root_span(spans: &[&ObjectMap]) -> f64 { // Based on https://github.com/DataDog/datadog-agent/blob/cfa750c7412faa98e87a015f8ee670e5828bbe7f/pkg/trace/stats/weight.go#L17-L26. // TODO this logic likely has a bug(s) that need to be root caused. The root span is not reliably found and defaults to "1.0" diff --git a/src/sinks/datadog/traces/request_builder.rs b/src/sinks/datadog/traces/request_builder.rs index 6822f172b6680..124b82d503d48 100644 --- a/src/sinks/datadog/traces/request_builder.rs +++ b/src/sinks/datadog/traces/request_builder.rs @@ -20,7 +20,7 @@ use super::{ sink::PartitionKey, }; use crate::{ - event::{Event, TraceEvent, Value}, + event::{Event, ObjectMap, TraceEvent, Value}, sinks::util::{ metadata::RequestMetadataBuilder, Compression, Compressor, IncrementalRequestBuilder, }, @@ -268,7 +268,7 @@ fn encode_trace(trace: &TraceEvent) -> dd_proto::TracerPayload { .and_then(|m| m.as_object()) .map(|m| { m.iter() - .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned())) + .map(|(k, v)| (k.to_string(), v.to_string_lossy().into_owned())) .collect::>() }) .unwrap_or_default(); @@ -340,7 +340,7 @@ fn encode_trace(trace: &TraceEvent) -> dd_proto::TracerPayload { } } -fn convert_span(span: &BTreeMap) -> dd_proto::Span { +fn convert_span(span: &ObjectMap) -> dd_proto::Span { let trace_id = match span.get("trace_id") { Some(Value::Integer(val)) => *val, _ => 0, @@ -371,7 +371,7 @@ fn convert_span(span: &BTreeMap) -> dd_proto::Span { .and_then(|m| m.as_object()) .map(|m| { m.iter() - .map(|(k, v)| (k.clone(), v.to_string_lossy().into_owned())) + .map(|(k, v)| (k.to_string(), v.to_string_lossy().into_owned())) .collect::>() }) .unwrap_or_default(); @@ -381,7 +381,7 @@ fn convert_span(span: &BTreeMap) -> dd_proto::Span { .and_then(|m| m.as_object()) .map(|m| { m.iter() - .map(|(k, v)| (k.clone(), v.coerce_to_bytes().into_iter().collect())) + .map(|(k, v)| (k.to_string(), v.coerce_to_bytes().into_iter().collect())) .collect::>>() }) .unwrap_or_default(); @@ -393,7 +393,7 @@ fn convert_span(span: &BTreeMap) -> dd_proto::Span { m.iter() .filter_map(|(k, v)| { if let Value::Float(f) = v { - Some((k.clone(), f.into_inner())) + Some((k.to_string(), f.into_inner())) } else { None } diff --git a/src/sinks/datadog/traces/tests.rs b/src/sinks/datadog/traces/tests.rs index ed9c19378830b..49b162cdddd3b 100644 --- a/src/sinks/datadog/traces/tests.rs +++ b/src/sinks/datadog/traces/tests.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::sync::Arc; use bytes::Bytes; use chrono::{TimeZone, Utc}; @@ -15,7 +15,7 @@ use super::{apm_stats::StatsPayload, dd_proto, ddsketch_full, DatadogTracesConfi use crate::{ config::SinkConfig, - event::{TraceEvent, Value}, + event::{ObjectMap, TraceEvent, Value}, sinks::util::test::{build_test_server_status, load_sink}, test_util::{ components::{assert_sink_compliance, SINK_TAGS}, @@ -56,39 +56,33 @@ async fn start_test( .await } -fn simple_span(resource: String) -> BTreeMap { - BTreeMap::::from([ - ("service".to_string(), Value::from("a_service")), - ("name".to_string(), Value::from("a_name")), - ("resource".to_string(), Value::from(resource)), - ("type".to_string(), Value::from("a_type")), - ("trace_id".to_string(), Value::Integer(123)), - ("span_id".to_string(), Value::Integer(456)), - ("parent_id".to_string(), Value::Integer(789)), +fn simple_span(resource: String) -> ObjectMap { + ObjectMap::from([ + ("service".into(), Value::from("a_service")), + ("name".into(), Value::from("a_name")), + ("resource".into(), Value::from(resource)), + ("type".into(), Value::from("a_type")), + ("trace_id".into(), Value::Integer(123)), + ("span_id".into(), Value::Integer(456)), + ("parent_id".into(), Value::Integer(789)), ( - "start".to_string(), + "start".into(), Value::from(Utc.timestamp_nanos(1_431_648_000_000_001i64)), ), - ("duration".to_string(), Value::Integer(1_000_000)), - ("error".to_string(), Value::Integer(404)), + ("duration".into(), Value::Integer(1_000_000)), + ("error".into(), Value::Integer(404)), ( - "meta".to_string(), - Value::Object(BTreeMap::::from([ - ("foo".to_string(), Value::from("bar")), - ("bar".to_string(), Value::from("baz")), + "meta".into(), + Value::Object(ObjectMap::from([ + ("foo".into(), Value::from("bar")), + ("bar".into(), Value::from("baz")), ])), ), ( - "metrics".to_string(), - Value::Object(BTreeMap::::from([ - ( - "a_metric".to_string(), - Value::Float(NotNan::new(0.577).unwrap()), - ), - ( - "_top_level".to_string(), - Value::Float(NotNan::new(1.0).unwrap()), - ), + "metrics".into(), + Value::Object(ObjectMap::from([ + ("a_metric".into(), Value::Float(NotNan::new(0.577).unwrap())), + ("_top_level".into(), Value::Float(NotNan::new(1.0).unwrap())), ])), ), ]) diff --git a/src/sinks/elasticsearch/tests.rs b/src/sinks/elasticsearch/tests.rs index b1688cf65e5ba..a3ed57a27f420 100644 --- a/src/sinks/elasticsearch/tests.rs +++ b/src/sinks/elasticsearch/tests.rs @@ -1,10 +1,10 @@ -use std::{collections::BTreeMap, convert::TryFrom}; +use std::convert::TryFrom; use vector_lib::lookup::PathPrefix; use crate::{ codecs::Transformer, - event::{LogEvent, Metric, MetricKind, MetricValue, Value}, + event::{LogEvent, Metric, MetricKind, MetricValue, ObjectMap, Value}, sinks::{ elasticsearch::{ sink::process_log, BulkAction, BulkConfig, DataStreamConfig, ElasticsearchApiVersion, @@ -67,8 +67,8 @@ fn data_stream_body( dtype: Option, dataset: Option, namespace: Option, -) -> BTreeMap { - let mut ds = BTreeMap::::new(); +) -> ObjectMap { + let mut ds = ObjectMap::new(); if let Some(dtype) = dtype { ds.insert("type".into(), Value::from(dtype)); diff --git a/src/sinks/influxdb/logs.rs b/src/sinks/influxdb/logs.rs index e668191dbf224..bd5c8460407f3 100644 --- a/src/sinks/influxdb/logs.rs +++ b/src/sinks/influxdb/logs.rs @@ -14,17 +14,17 @@ use vector_lib::lookup::lookup_v2::OptionalValuePath; use vector_lib::lookup::PathPrefix; use vector_lib::schema; +use super::{ + encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field, + InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion, +}; use crate::{ codecs::Transformer, config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, - event::{Event, MetricTags, Value}, + event::{Event, KeyString, MetricTags, Value}, http::HttpClient, internal_events::InfluxdbEncodingError, sinks::{ - influxdb::{ - encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field, - InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion, - }, util::{ http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, BatchConfig, Buffer, Compression, SinkBatchSettings, TowerRequestConfig, @@ -75,7 +75,7 @@ pub struct InfluxDbLogsConfig { #[serde(default)] #[configurable(metadata(docs::examples = "field1"))] #[configurable(metadata(docs::examples = "parent.child_field"))] - pub tags: Vec, + pub tags: Vec, #[serde(flatten)] pub influxdb1_settings: Option, @@ -138,7 +138,7 @@ struct InfluxDbLogsSink { token: String, protocol_version: ProtocolVersion, measurement: String, - tags: HashSet, + tags: HashSet, transformer: Transformer, host_key: OwnedValuePath, message_key: OwnedValuePath, @@ -164,7 +164,7 @@ impl GenerateConfig for InfluxDbLogsConfig { impl SinkConfig for InfluxDbLogsConfig { async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { let measurement = self.get_measurement()?; - let tags: HashSet = self.tags.clone().into_iter().collect(); + let tags: HashSet = self.tags.iter().cloned().collect(); let tls_settings = TlsSettings::from_options(&self.tls)?; let client = HttpClient::new(tls_settings, cx.proxy())?; @@ -251,7 +251,7 @@ impl SinkConfig for InfluxDbLogsConfig { struct InfluxDbLogsEncoder { protocol_version: ProtocolVersion, measurement: String, - tags: HashSet, + tags: HashSet, transformer: Transformer, host_key: OwnedValuePath, message_key: OwnedValuePath, @@ -271,16 +271,16 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { // Add the `host` and `source_type` to the HashSet of tags to include // Ensure those paths are on the event to be encoded, rather than metadata if let Some(host_path) = log.host_path().cloned().as_ref() { - self.tags.replace(host_path.path.to_string()); + self.tags.replace(host_path.path.to_string().into()); log.rename_key(host_path, (PathPrefix::Event, &self.host_key)); } if let Some(source_type_path) = log.source_type_path().cloned().as_ref() { - self.tags.replace(source_type_path.path.to_string()); + self.tags.replace(source_type_path.path.to_string().into()); log.rename_key(source_type_path, (PathPrefix::Event, &self.source_type_key)); } - self.tags.replace("metric_type".to_string()); + self.tags.replace("metric_type".into()); log.insert(event_path!("metric_type"), "logs"); // Timestamp @@ -297,10 +297,10 @@ impl HttpEventEncoder for InfluxDbLogsEncoder { // Tags + Fields let mut tags = MetricTags::default(); - let mut fields: HashMap = HashMap::new(); + let mut fields: HashMap = HashMap::new(); log.convert_to_fields().for_each(|(key, value)| { - if self.tags.contains(&key) { - tags.replace(key, value.to_string_lossy().into_owned()); + if self.tags.contains(&key[..]) { + tags.replace(key.into(), value.to_string_lossy().into_owned()); } else { fields.insert(key, to_field(value)); } @@ -857,7 +857,7 @@ mod tests { let uri = uri.parse::().unwrap(); let token = token.to_string(); let measurement = measurement.to_string(); - let tags: HashSet = tags.into_iter().map(|tag| tag.to_string()).collect(); + let tags: HashSet<_> = tags.into_iter().map(|tag| tag.into()).collect(); InfluxDbLogsSink { uri, token, diff --git a/src/sinks/influxdb/metrics.rs b/src/sinks/influxdb/metrics.rs index cf5e5c7bd2549..597c3e2190602 100644 --- a/src/sinks/influxdb/metrics.rs +++ b/src/sinks/influxdb/metrics.rs @@ -14,7 +14,7 @@ use crate::{ config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext}, event::{ metric::{Metric, MetricValue, Sample, StatisticKind}, - Event, + Event, KeyString, }, http::HttpClient, internal_events::InfluxdbEncodingError, @@ -319,7 +319,7 @@ fn encode_events( fn get_type_and_fields( value: &MetricValue, quantiles: &[f64], -) -> (&'static str, Option>) { +) -> (&'static str, Option>) { match value { MetricValue::Counter { value } => ("counter", Some(to_fields(*value))), MetricValue::Gauge { value } => ("gauge", Some(to_fields(*value))), @@ -329,17 +329,17 @@ fn get_type_and_fields( count, sum, } => { - let mut fields: HashMap = buckets + let mut fields: HashMap = buckets .iter() .map(|sample| { ( - format!("bucket_{}", sample.upper_limit), + format!("bucket_{}", sample.upper_limit).into(), Field::UnsignedInt(sample.count), ) }) .collect(); - fields.insert("count".to_owned(), Field::UnsignedInt(*count)); - fields.insert("sum".to_owned(), Field::Float(*sum)); + fields.insert("count".into(), Field::UnsignedInt(*count)); + fields.insert("sum".into(), Field::Float(*sum)); ("histogram", Some(fields)) } @@ -348,17 +348,17 @@ fn get_type_and_fields( count, sum, } => { - let mut fields: HashMap = quantiles + let mut fields: HashMap = quantiles .iter() .map(|quantile| { ( - format!("quantile_{}", quantile.quantile), + format!("quantile_{}", quantile.quantile).into(), Field::Float(quantile.value), ) }) .collect(); - fields.insert("count".to_owned(), Field::UnsignedInt(*count)); - fields.insert("sum".to_owned(), Field::Float(*sum)); + fields.insert("count".into(), Field::UnsignedInt(*count)); + fields.insert("sum".into(), Field::Float(*sum)); ("summary", Some(fields)) } @@ -382,31 +382,25 @@ fn get_type_and_fields( value: ddsketch.quantile(*q).unwrap_or(0.0), }; ( - quantile.to_percentile_string(), + quantile.to_percentile_string().into(), Field::Float(quantile.value), ) }) - .collect::>(); + .collect::>(); fields.insert( - "count".to_owned(), + "count".into(), Field::UnsignedInt(u64::from(ddsketch.count())), ); fields.insert( - "min".to_owned(), + "min".into(), Field::Float(ddsketch.min().unwrap_or(f64::MAX)), ); fields.insert( - "max".to_owned(), + "max".into(), Field::Float(ddsketch.max().unwrap_or(f64::MIN)), ); - fields.insert( - "sum".to_owned(), - Field::Float(ddsketch.sum().unwrap_or(0.0)), - ); - fields.insert( - "avg".to_owned(), - Field::Float(ddsketch.avg().unwrap_or(0.0)), - ); + fields.insert("sum".into(), Field::Float(ddsketch.sum().unwrap_or(0.0))); + fields.insert("avg".into(), Field::Float(ddsketch.avg().unwrap_or(0.0))); ("sketch", Some(fields)) } @@ -414,34 +408,33 @@ fn get_type_and_fields( } } -fn encode_distribution(samples: &[Sample], quantiles: &[f64]) -> Option> { +fn encode_distribution(samples: &[Sample], quantiles: &[f64]) -> Option> { let statistic = DistributionStatistic::from_samples(samples, quantiles)?; - let fields: HashMap = vec![ - ("min".to_owned(), Field::Float(statistic.min)), - ("max".to_owned(), Field::Float(statistic.max)), - ("median".to_owned(), Field::Float(statistic.median)), - ("avg".to_owned(), Field::Float(statistic.avg)), - ("sum".to_owned(), Field::Float(statistic.sum)), - ("count".to_owned(), Field::Float(statistic.count as f64)), - ] - .into_iter() - .chain( - statistic - .quantiles - .iter() - .map(|&(p, val)| (format!("quantile_{:.2}", p), Field::Float(val))), + Some( + [ + ("min".into(), Field::Float(statistic.min)), + ("max".into(), Field::Float(statistic.max)), + ("median".into(), Field::Float(statistic.median)), + ("avg".into(), Field::Float(statistic.avg)), + ("sum".into(), Field::Float(statistic.sum)), + ("count".into(), Field::Float(statistic.count as f64)), + ] + .into_iter() + .chain( + statistic + .quantiles + .iter() + .map(|&(p, val)| (format!("quantile_{:.2}", p).into(), Field::Float(val))), + ) + .collect(), ) - .collect(); - - Some(fields) } -fn to_fields(value: f64) -> HashMap { - let fields: HashMap = vec![("value".to_owned(), Field::Float(value))] +fn to_fields(value: f64) -> HashMap { + [("value".into(), Field::Float(value))] .into_iter() - .collect(); - fields + .collect() } #[cfg(test)] diff --git a/src/sinks/influxdb/mod.rs b/src/sinks/influxdb/mod.rs index 146bd83f6ffb8..12dacd5d23d27 100644 --- a/src/sinks/influxdb/mod.rs +++ b/src/sinks/influxdb/mod.rs @@ -10,7 +10,7 @@ use http::{StatusCode, Uri}; use snafu::{ResultExt, Snafu}; use tower::Service; use vector_lib::configurable::configurable_component; -use vector_lib::event::MetricTags; +use vector_lib::event::{KeyString, MetricTags}; use vector_lib::sensitive_string::SensitiveString; use crate::http::HttpClient; @@ -232,7 +232,7 @@ pub(in crate::sinks) fn influx_line_protocol( protocol_version: ProtocolVersion, measurement: &str, tags: Option, - fields: Option>, + fields: Option>, timestamp: i64, line_protocol: &mut BytesMut, ) -> Result<(), &'static str> { @@ -284,7 +284,7 @@ fn encode_tags(tags: MetricTags, output: &mut BytesMut) { fn encode_fields( protocol_version: ProtocolVersion, - fields: HashMap, + fields: HashMap, output: &mut BytesMut, ) { let original_len = output.len(); @@ -736,20 +736,17 @@ mod tests { #[test] fn test_encode_fields_v1() { let fields = vec![ + ("field_string".into(), Field::String("string value".into())), ( - "field_string".to_owned(), - Field::String("string value".to_owned()), + "field_string_escape".into(), + Field::String("string\\val\"ue".into()), ), - ( - "field_string_escape".to_owned(), - Field::String("string\\val\"ue".to_owned()), - ), - ("field_float".to_owned(), Field::Float(123.45)), - ("field_unsigned_int".to_owned(), Field::UnsignedInt(657)), - ("field_int".to_owned(), Field::Int(657646)), - ("field_bool_true".to_owned(), Field::Bool(true)), - ("field_bool_false".to_owned(), Field::Bool(false)), - ("escape key".to_owned(), Field::Float(10.0)), + ("field_float".into(), Field::Float(123.45)), + ("field_unsigned_int".into(), Field::UnsignedInt(657)), + ("field_int".into(), Field::Int(657646)), + ("field_bool_true".into(), Field::Bool(true)), + ("field_bool_false".into(), Field::Bool(false)), + ("escape key".into(), Field::Float(10.0)), ] .into_iter() .collect(); @@ -776,20 +773,17 @@ mod tests { #[test] fn test_encode_fields() { let fields = vec![ + ("field_string".into(), Field::String("string value".into())), ( - "field_string".to_owned(), - Field::String("string value".to_owned()), - ), - ( - "field_string_escape".to_owned(), - Field::String("string\\val\"ue".to_owned()), + "field_string_escape".into(), + Field::String("string\\val\"ue".into()), ), - ("field_float".to_owned(), Field::Float(123.45)), - ("field_unsigned_int".to_owned(), Field::UnsignedInt(657)), - ("field_int".to_owned(), Field::Int(657646)), - ("field_bool_true".to_owned(), Field::Bool(true)), - ("field_bool_false".to_owned(), Field::Bool(false)), - ("escape key".to_owned(), Field::Float(10.0)), + ("field_float".into(), Field::Float(123.45)), + ("field_unsigned_int".into(), Field::UnsignedInt(657)), + ("field_int".into(), Field::Int(657646)), + ("field_bool_true".into(), Field::Bool(true)), + ("field_bool_false".into(), Field::Bool(false)), + ("escape key".into(), Field::Float(10.0)), ] .into_iter() .collect(); diff --git a/src/sinks/kafka/request_builder.rs b/src/sinks/kafka/request_builder.rs index 60cebfe51fbeb..f342b1aa22188 100644 --- a/src/sinks/kafka/request_builder.rs +++ b/src/sinks/kafka/request_builder.rs @@ -119,20 +119,18 @@ fn get_headers(event: &Event, headers_key: Option<&OwnedTargetPath>) -> Option String { #[cfg(test)] mod tests { - use std::{ - collections::{BTreeMap, HashMap}, - convert::TryFrom, - }; + use std::{collections::HashMap, convert::TryFrom}; use futures::stream::StreamExt; use vector_lib::codecs::JsonSerializerConfig; - use vector_lib::event::{Event, LogEvent, Value}; + use vector_lib::event::{Event, LogEvent, ObjectMap, Value}; use vector_lib::lookup::PathPrefix; use super::{EventEncoder, KeyPartitioner, RecordFilter}; @@ -593,9 +590,9 @@ mod tests { log.insert("name", "foo"); log.insert("value", "bar"); - let mut test_dict = BTreeMap::default(); - test_dict.insert("one".to_string(), Value::from("foo")); - test_dict.insert("two".to_string(), Value::from("baz")); + let mut test_dict = ObjectMap::default(); + test_dict.insert("one".into(), Value::from("foo")); + test_dict.insert("two".into(), Value::from("baz")); log.insert("dict", Value::from(test_dict)); let record = encoder.encode_event(event).unwrap(); @@ -650,7 +647,7 @@ mod tests { } } "#; - let msg: BTreeMap = serde_json::from_str(message)?; + let msg: ObjectMap = serde_json::from_str(message)?; let event = Event::Log(LogEvent::from(msg)); let record = encoder.encode_event(event).unwrap(); @@ -695,7 +692,7 @@ mod tests { } } "#; - let msg: BTreeMap = serde_json::from_str(message)?; + let msg: ObjectMap = serde_json::from_str(message)?; let event = Event::Log(LogEvent::from(msg)); let record = encoder.encode_event(event).unwrap(); @@ -723,7 +720,7 @@ mod tests { remove_timestamp: false, }; - let msg: BTreeMap = serde_json::from_str("{}")?; + let msg: ObjectMap = serde_json::from_str("{}")?; let event = Event::Log(LogEvent::from(msg)); let record = encoder.encode_event(event).unwrap(); diff --git a/src/sinks/new_relic/model.rs b/src/sinks/new_relic/model.rs index 1014d3c54925a..615c27fe1b035 100644 --- a/src/sinks/new_relic/model.rs +++ b/src/sinks/new_relic/model.rs @@ -12,7 +12,7 @@ use vector_lib::internal_event::{ComponentEventsDropped, INTENTIONAL, UNINTENTIO use vrl::event_path; use super::NewRelicSinkError; -use crate::event::{Event, MetricKind, MetricValue, Value}; +use crate::event::{Event, KeyString, MetricKind, MetricValue, Value}; #[derive(Debug)] pub enum NewRelicApiModel { @@ -21,8 +21,8 @@ pub enum NewRelicApiModel { Logs(LogsApiModel), } -type KeyValData = HashMap; -type DataStore = HashMap>; +type KeyValData = HashMap; +type DataStore = HashMap>; #[derive(Serialize, Deserialize, Debug)] pub struct MetricsApiModel(pub Vec); @@ -30,7 +30,7 @@ pub struct MetricsApiModel(pub Vec); impl MetricsApiModel { pub fn new(metric_array: Vec) -> Self { let mut metric_store = DataStore::new(); - metric_store.insert("metrics".to_owned(), metric_array); + metric_store.insert("metrics".into(), metric_array); Self(vec![metric_store]) } } @@ -66,10 +66,8 @@ impl TryFrom> for MetricsApiModel { num_missing_interval += 1; return None; }; - metric_data.insert( - "interval.ms".to_owned(), - Value::from(interval_ms.get() as i64), - ); + metric_data + .insert("interval.ms".into(), Value::from(interval_ms.get() as i64)); (value, "count") } (MetricValue::Counter { value }, MetricKind::Absolute) => (value, "gauge"), @@ -82,15 +80,15 @@ impl TryFrom> for MetricsApiModel { }; // Set name, type, value, timestamp, and attributes - metric_data.insert("name".to_owned(), Value::from(series.name.name)); - metric_data.insert("type".to_owned(), Value::from(metric_type)); + metric_data.insert("name".into(), Value::from(series.name.name)); + metric_data.insert("type".into(), Value::from(metric_type)); let Some(value) = NotNan::new(value).ok() else { num_nan_value += 1; return None; }; - metric_data.insert("value".to_owned(), Value::from(value)); + metric_data.insert("value".into(), Value::from(value)); metric_data.insert( - "timestamp".to_owned(), + "timestamp".into(), Value::from( data.time .timestamp @@ -100,10 +98,10 @@ impl TryFrom> for MetricsApiModel { ); if let Some(tags) = series.tags { metric_data.insert( - "attributes".to_owned(), + "attributes".into(), Value::from( tags.iter_single() - .map(|(key, value)| (key.to_string(), Value::from(value))) + .map(|(key, value)| (key.into(), Value::from(value))) .collect::>(), ), ); @@ -162,7 +160,7 @@ impl TryFrom> for EventsApiModel { let mut num_non_log_events = 0; let mut num_nan_value = 0; - let events_array: Vec> = buf_events + let events_array: Vec> = buf_events .into_iter() .filter_map(|event| { let Some(log) = event.try_into_log() else { @@ -184,23 +182,23 @@ impl TryFrom> for EventsApiModel { for (k, v) in json_map { match v { serde_json::Value::String(s) => { - event_model.insert(k, Value::from(s)); + event_model.insert(k.into(), Value::from(s)); } serde_json::Value::Number(n) => { if let Some(f) = n.as_f64() { event_model.insert( - k, + k.into(), Value::from(NotNan::new(f).ok().or_else(|| { num_nan_value += 1; None })?), ); } else { - event_model.insert(k, Value::from(n.as_i64())); + event_model.insert(k.into(), Value::from(n.as_i64())); } } serde_json::Value::Bool(b) => { - event_model.insert(k, Value::from(b)); + event_model.insert(k.into(), Value::from(b)); } _ => { // Note that arrays and nested objects are silently dropped. @@ -212,8 +210,7 @@ impl TryFrom> for EventsApiModel { } if event_model.get("eventType").is_none() { - event_model - .insert("eventType".to_owned(), Value::from("VectorSink".to_owned())); + event_model.insert("eventType".into(), Value::from("VectorSink".to_owned())); } Some(event_model) @@ -247,7 +244,7 @@ pub struct LogsApiModel(pub Vec); impl LogsApiModel { pub fn new(logs_array: Vec) -> Self { let mut logs_store = DataStore::new(); - logs_store.insert("logs".to_owned(), logs_array); + logs_store.insert("logs".into(), logs_array); Self(vec![logs_store]) } } @@ -258,7 +255,7 @@ impl TryFrom> for LogsApiModel { fn try_from(buf_events: Vec) -> Result { let mut num_non_log_events = 0; - let logs_array: Vec> = buf_events + let logs_array: Vec> = buf_events .into_iter() .filter_map(|event| { let Some(log) = event.try_into_log() else { @@ -271,10 +268,7 @@ impl TryFrom> for LogsApiModel { log_model.insert(k, v.clone()); } if log.get(event_path!("message")).is_none() { - log_model.insert( - "message".to_owned(), - Value::from("log from vector".to_owned()), - ); + log_model.insert("message".into(), Value::from("log from vector".to_owned())); } Some(log_model) diff --git a/src/sinks/new_relic/tests.rs b/src/sinks/new_relic/tests.rs index 69fd59898df78..0ad63acd56aba 100644 --- a/src/sinks/new_relic/tests.rs +++ b/src/sinks/new_relic/tests.rs @@ -8,7 +8,7 @@ use vector_lib::config::{init_telemetry, Tags, Telemetry}; use super::*; use crate::{ config::{GenerateConfig, SinkConfig, SinkContext}, - event::{Event, LogEvent, Metric, MetricKind, MetricValue, Value}, + event::{Event, KeyString, LogEvent, Metric, MetricKind, MetricValue, Value}, test_util::{ components::{ run_and_assert_data_volume_sink_compliance, run_and_assert_sink_compliance, @@ -71,10 +71,10 @@ async fn component_spec_compliance_data_volume() { #[test] fn generate_event_api_model() { // Without message field - let mut map = HashMap::::new(); - map.insert("eventType".to_owned(), Value::from("TestEvent".to_owned())); - map.insert("user".to_owned(), Value::from("Joe".to_owned())); - map.insert("user_id".to_owned(), Value::from(123456)); + let mut map = HashMap::::new(); + map.insert("eventType".into(), Value::from("TestEvent".to_owned())); + map.insert("user".into(), Value::from("Joe".to_owned())); + map.insert("user_id".into(), Value::from(123456)); let event = Event::Log(LogEvent::from(map)); let model = EventsApiModel::try_from(vec![event]).expect("Failed mapping events into API model"); @@ -94,12 +94,12 @@ fn generate_event_api_model() { assert_eq!(model.0[0].get("user_id").unwrap(), &Value::Integer(123456)); // With message field - let mut map = HashMap::::new(); - map.insert("eventType".to_owned(), Value::from("TestEvent".to_owned())); - map.insert("user".to_owned(), Value::from("Joe".to_owned())); - map.insert("user_id".to_owned(), Value::from(123456)); + let mut map = HashMap::::new(); + map.insert("eventType".into(), Value::from("TestEvent".to_owned())); + map.insert("user".into(), Value::from("Joe".to_owned())); + map.insert("user_id".into(), Value::from(123456)); map.insert( - "message".to_owned(), + "message".into(), Value::from("This is a message".to_owned()), ); let event = Event::Log(LogEvent::from(map)); @@ -126,12 +126,12 @@ fn generate_event_api_model() { ); // With a JSON encoded inside the message field - let mut map = HashMap::::new(); - map.insert("eventType".to_owned(), Value::from("TestEvent".to_owned())); - map.insert("user".to_owned(), Value::from("Joe".to_owned())); - map.insert("user_id".to_owned(), Value::from(123456)); + let mut map = HashMap::::new(); + map.insert("eventType".into(), Value::from("TestEvent".to_owned())); + map.insert("user".into(), Value::from("Joe".to_owned())); + map.insert("user_id".into(), Value::from(123456)); map.insert( - "message".to_owned(), + "message".into(), Value::from("{\"my_key\" : \"my_value\"}".to_owned()), ); let event = Event::Log(LogEvent::from(map)); @@ -161,8 +161,8 @@ fn generate_event_api_model() { #[test] fn generate_log_api_model() { // Without message field - let mut map = HashMap::::new(); - map.insert("tag_key".to_owned(), Value::from("tag_value".to_owned())); + let mut map = HashMap::::new(); + map.insert("tag_key".into(), Value::from("tag_value".to_owned())); let event = Event::Log(LogEvent::from(map)); let model = LogsApiModel::try_from(vec![event]).expect("Failed mapping logs into API model"); let logs = model.0[0].get("logs").expect("Logs data store not present"); @@ -176,10 +176,10 @@ fn generate_log_api_model() { assert!(logs[0].get("message").is_some()); // With message field - let mut map = HashMap::::new(); - map.insert("tag_key".to_owned(), Value::from("tag_value".to_owned())); + let mut map = HashMap::::new(); + map.insert("tag_key".into(), Value::from("tag_value".to_owned())); map.insert( - "message".to_owned(), + "message".into(), Value::from("This is a message".to_owned()), ); let event = Event::Log(LogEvent::from(map)); diff --git a/src/sinks/pulsar/integration_tests.rs b/src/sinks/pulsar/integration_tests.rs index ac11fa940cb0b..30a3d51a23646 100644 --- a/src/sinks/pulsar/integration_tests.rs +++ b/src/sinks/pulsar/integration_tests.rs @@ -1,9 +1,8 @@ use crate::sinks::pulsar::{config::PulsarSinkConfig, sink::PulsarSink}; use futures::StreamExt; use pulsar::SubType; -use std::collections::BTreeMap; -use crate::event::Value; +use crate::event::{ObjectMap, Value}; use crate::sinks::VectorSink; use crate::template::Template; use crate::test_util::{ @@ -29,11 +28,8 @@ async fn pulsar_happy_reuse(mut cnf: PulsarSinkConfig) { // if a property_key is defined, add some properties! if let Some(properties_key) = &prop_key_opt { if let Some(properties_key) = &properties_key.path { - let mut property_values = BTreeMap::new(); - property_values.insert( - prop_1_key.to_owned(), - Value::Bytes(Bytes::from(prop_1_value)), - ); + let mut property_values = ObjectMap::new(); + property_values.insert(prop_1_key.into(), Value::Bytes(Bytes::from(prop_1_value))); events.iter_logs_mut().for_each(move |log| { log.insert(properties_key, property_values.clone()); }); diff --git a/src/sinks/pulsar/request_builder.rs b/src/sinks/pulsar/request_builder.rs index de62f179dcb30..962fbeb6eb7d6 100644 --- a/src/sinks/pulsar/request_builder.rs +++ b/src/sinks/pulsar/request_builder.rs @@ -2,6 +2,7 @@ use bytes::Bytes; use std::collections::HashMap; use std::io; +use crate::event::KeyString; use crate::sinks::{ prelude::*, pulsar::{encoder::PulsarEncoder, service::PulsarRequest, sink::PulsarEvent}, @@ -11,7 +12,7 @@ use crate::sinks::{ pub(super) struct PulsarMetadata { pub finalizers: EventFinalizers, pub key: Option, - pub properties: Option>, + pub properties: Option>, pub timestamp_millis: Option, pub topic: String, } diff --git a/src/sinks/pulsar/service.rs b/src/sinks/pulsar/service.rs index 8afabb3260207..a7afd635ef2d8 100644 --- a/src/sinks/pulsar/service.rs +++ b/src/sinks/pulsar/service.rs @@ -106,7 +106,7 @@ impl Service for PulsarService { let mut properties = HashMap::new(); if let Some(props) = request.metadata.properties { for (key, value) in props { - properties.insert(key, String::from_utf8_lossy(&value).to_string()); + properties.insert(key.into(), String::from_utf8_lossy(&value).to_string()); } } diff --git a/src/sinks/pulsar/sink.rs b/src/sinks/pulsar/sink.rs index 7bd26ac24f5a8..479d4455b97e0 100644 --- a/src/sinks/pulsar/sink.rs +++ b/src/sinks/pulsar/sink.rs @@ -4,13 +4,13 @@ use pulsar::{Error as PulsarError, Pulsar, TokioExecutor}; use serde::Serialize; use snafu::Snafu; use std::collections::HashMap; - -use crate::sinks::prelude::*; +use vrl::value::KeyString; use super::{ config::PulsarSinkConfig, encoder::PulsarEncoder, request_builder::PulsarRequestBuilder, service::PulsarService, util, }; +use crate::sinks::prelude::*; #[derive(Debug, Snafu)] #[snafu(visibility(pub(crate)))] @@ -37,7 +37,7 @@ pub(super) struct PulsarEvent { pub(super) event: Event, pub(super) topic: String, pub(super) key: Option, - pub(super) properties: Option>, + pub(super) properties: Option>, pub(super) timestamp_millis: Option, } @@ -56,7 +56,7 @@ impl ByteSizeOf for PulsarEvent { + self.properties.as_ref().map_or(0, |props| { props .iter() - .map(|(key, val)| key.capacity() + val.size_of()) + .map(|(key, val)| key.allocated_bytes() + val.size_of()) .sum() }) } diff --git a/src/sinks/pulsar/tests.rs b/src/sinks/pulsar/tests.rs index 3c6d8da9e608f..9a535a7417a4b 100644 --- a/src/sinks/pulsar/tests.rs +++ b/src/sinks/pulsar/tests.rs @@ -1,13 +1,10 @@ -use crate::event::Event; -use crate::sinks::pulsar::config::PulsarSinkConfig; -use std::collections::BTreeMap; +use bytes::Bytes; use vector_lib::configurable::component::GenerateConfig; use vector_lib::lookup::lookup_v2::OptionalTargetPath; -use vrl::value::Value; +use vrl::value::{ObjectMap, Value}; -use bytes::Bytes; - -use crate::event::LogEvent; +use crate::event::{Event, LogEvent}; +use crate::sinks::pulsar::config::PulsarSinkConfig; #[test] fn generate_config() { @@ -18,9 +15,9 @@ fn generate_config() { fn pulsar_get_headers() { let properties_key = OptionalTargetPath::try_from("properties".to_string()) .expect("unable to parse OptionalTargetPath"); - let mut property_values = BTreeMap::new(); - property_values.insert("a-key".to_string(), Value::Bytes(Bytes::from("a-value"))); - property_values.insert("b-key".to_string(), Value::Bytes(Bytes::from("b-value"))); + let mut property_values = ObjectMap::new(); + property_values.insert("a-key".into(), Value::Bytes(Bytes::from("a-value"))); + property_values.insert("b-key".into(), Value::Bytes(Bytes::from("b-value"))); let mut event = Event::Log(LogEvent::from("hello")); event diff --git a/src/sinks/pulsar/util.rs b/src/sinks/pulsar/util.rs index 9ae2b584f3683..af34a01653e5c 100644 --- a/src/sinks/pulsar/util.rs +++ b/src/sinks/pulsar/util.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use std::collections::HashMap; use vector_lib::event::Event; use vector_lib::lookup::lookup_v2::OptionalTargetPath; -use vrl::value::Value; +use vrl::value::{KeyString, Value}; /// Transforms an event into a Pulsar event by rendering the required template fields. /// Returns None if there is an error whilst rendering. @@ -57,7 +57,7 @@ fn get_timestamp_millis(event: &Event) -> Option { pub(super) fn get_properties( event: &Event, properties_key: &Option, -) -> Option> { +) -> Option> { properties_key.as_ref().and_then(|properties_key| { properties_key.path.as_ref().and_then(|path| { event.maybe_as_log().and_then(|log| { diff --git a/src/sinks/sematext/metrics.rs b/src/sinks/sematext/metrics.rs index 982c1393b2919..14a3f37470709 100644 --- a/src/sinks/sematext/metrics.rs +++ b/src/sinks/sematext/metrics.rs @@ -15,7 +15,7 @@ use crate::{ config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, event::{ metric::{Metric, MetricValue}, - Event, + Event, KeyString, }, http::HttpClient, internal_events::{SematextMetricsEncodeEventError, SematextMetricsInvalidMetricError}, @@ -299,9 +299,9 @@ fn encode_events( EncodedEvent::new(output.freeze(), byte_size, json_byte_size) } -fn to_fields(label: String, value: f64) -> HashMap { +fn to_fields(label: String, value: f64) -> HashMap { let mut result = HashMap::new(); - result.insert(label, Field::Float(value)); + result.insert(label.into(), Field::Float(value)); result } diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index dabaf350e42fc..4acf7238a30ac 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -152,7 +152,7 @@ mod tests { }; use vector_lib::event::LogEvent; use vector_lib::{internal_event::CountByteSize, json_size::JsonSize}; - use vrl::value::Value; + use vrl::value::{KeyString, Value}; use super::*; @@ -189,7 +189,7 @@ mod tests { let mut writer = Vec::new(); let input = vec![Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), + KeyString::from("key"), Value::from("value"), )])))]; @@ -217,15 +217,15 @@ mod tests { let input = vec![ Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), + KeyString::from("key"), Value::from("value1"), )]))), Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), + KeyString::from("key"), Value::from("value2"), )]))), Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), + KeyString::from("key"), Value::from("value3"), )]))), ]; @@ -280,7 +280,7 @@ mod tests { let mut writer = Vec::new(); let input = vec![Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), + KeyString::from("key"), Value::from("value"), )])))]; let input_json_size = input @@ -308,15 +308,15 @@ mod tests { let mut writer = Vec::new(); let input = vec![ Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), + KeyString::from("key"), Value::from("value1"), )]))), Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), + KeyString::from("key"), Value::from("value2"), )]))), Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), + KeyString::from("key"), Value::from("value3"), )]))), ]; @@ -344,7 +344,7 @@ mod tests { let mut writer = Vec::new(); let input = Event::Log(LogEvent::from(BTreeMap::from([( - String::from("key"), + KeyString::from("key"), Value::from("value"), )]))); let input_json_size = input.estimated_json_encoded_size_of(); @@ -365,7 +365,7 @@ mod tests { let mut writer = Vec::new(); let input = Event::Log(LogEvent::from(BTreeMap::from([( - String::from("message"), + KeyString::from("message"), Value::from("value"), )]))); let input_json_size = input.estimated_json_encoded_size_of(); diff --git a/src/sources/datadog_agent/tests.rs b/src/sources/datadog_agent/tests.rs index 0c569340e5c02..dc9820a63649c 100644 --- a/src/sources/datadog_agent/tests.rs +++ b/src/sources/datadog_agent/tests.rs @@ -25,7 +25,7 @@ use vector_lib::{ metric_tags, }; use vrl::compiler::value::Collection; -use vrl::value::Kind; +use vrl::value::{Kind, ObjectMap}; use crate::schema::Definition; use crate::{ @@ -1250,12 +1250,8 @@ async fn decode_traces() { assert_eq!( trace_v2.as_map()["tags"], - Value::Object(BTreeMap::from_iter( - [ - ("a".to_string(), "tag".into()), - ("another".to_string(), "tag".into()) - ] - .into_iter() + Value::Object(ObjectMap::from_iter( + [("a".into(), "tag".into()), ("another".into(), "tag".into())].into_iter() )) ); diff --git a/src/sources/datadog_agent/traces.rs b/src/sources/datadog_agent/traces.rs index 47ac974307831..a9bef1086560c 100644 --- a/src/sources/datadog_agent/traces.rs +++ b/src/sources/datadog_agent/traces.rs @@ -13,7 +13,7 @@ use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _}; use vector_lib::EstimatedJsonEncodedSizeOf; use crate::{ - event::{Event, TraceEvent, Value}, + event::{Event, ObjectMap, TraceEvent, Value}, sources::{ datadog_agent::{ddtrace_proto, handle_request, ApiKeyQueryParams, DatadogAgentSource}, util::ErrorMessage, @@ -282,8 +282,8 @@ fn handle_dd_trace_payload_v0( Ok(enriched_events) } -fn convert_span(dd_span: ddtrace_proto::Span) -> BTreeMap { - let mut span = BTreeMap::::new(); +fn convert_span(dd_span: ddtrace_proto::Span) -> ObjectMap { + let mut span = ObjectMap::new(); span.insert("service".into(), Value::from(dd_span.service)); span.insert("name".into(), Value::from(dd_span.name)); @@ -308,8 +308,13 @@ fn convert_span(dd_span: ddtrace_proto::Span) -> BTreeMap { dd_span .metrics .into_iter() - .map(|(k, v)| (k, NotNan::new(v).map(Value::Float).unwrap_or(Value::Null))) - .collect::>(), + .map(|(k, v)| { + ( + k.into(), + NotNan::new(v).map(Value::Float).unwrap_or(Value::Null), + ) + }) + .collect::(), ), ); span.insert("type".into(), Value::from(dd_span.r#type)); @@ -319,17 +324,17 @@ fn convert_span(dd_span: ddtrace_proto::Span) -> BTreeMap { dd_span .meta_struct .into_iter() - .map(|(k, v)| (k, Value::from(bytes::Bytes::from(v)))) - .collect::>(), + .map(|(k, v)| (k.into(), Value::from(bytes::Bytes::from(v)))) + .collect::(), ), ); span } -fn convert_tags(original_map: BTreeMap) -> BTreeMap { +fn convert_tags(original_map: BTreeMap) -> ObjectMap { original_map .into_iter() - .map(|(k, v)| (k, Value::from(v))) - .collect::>() + .map(|(k, v)| (k.into(), Value::from(v))) + .collect::() } diff --git a/src/sources/file.rs b/src/sources/file.rs index ac7f7834b54fb..ea03e8e30ac10 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -1470,11 +1470,12 @@ mod tests { default_file_key() .path .expect("file key to exist") - .to_string(), - log_schema().host_key().unwrap().to_string(), - log_schema().message_key().unwrap().to_string(), - log_schema().timestamp_key().unwrap().to_string(), - log_schema().source_type_key().unwrap().to_string() + .to_string() + .into(), + log_schema().host_key().unwrap().to_string().into(), + log_schema().message_key().unwrap().to_string().into(), + log_schema().timestamp_key().unwrap().to_string().into(), + log_schema().source_type_key().unwrap().to_string().into() ] .into_iter() .collect::>() diff --git a/src/sources/fluent/message.rs b/src/sources/fluent/message.rs index bb9e3326fd7b8..dd3e27e3f84ab 100644 --- a/src/sources/fluent/message.rs +++ b/src/sources/fluent/message.rs @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, convert::TryInto}; use chrono::{serde::ts_seconds, DateTime, TimeZone, Utc}; use ordered_float::NotNan; use serde::{Deserialize, Serialize}; -use vector_lib::event::Value; +use vector_lib::event::{KeyString, ObjectMap, Value}; /// Fluent msgpack messages can be encoded in one of three ways, each with and /// without options, all using arrays to encode the top-level fields. @@ -184,18 +184,18 @@ impl From for Value { .into_iter() .filter_map(|(key, value)| { key.as_str() - .map(|k| (k.to_owned(), Value::from(FluentValue(value)))) + .map(|k| (k.into(), Value::from(FluentValue(value)))) }) .collect(), ) } rmpv::Value::Ext(code, bytes) => { - let mut fields = BTreeMap::new(); + let mut fields = ObjectMap::new(); fields.insert( - String::from("msgpack_extension_code"), + KeyString::from("msgpack_extension_code"), Value::Integer(code.into()), ); - fields.insert(String::from("bytes"), Value::Bytes(bytes.into())); + fields.insert(KeyString::from("bytes"), Value::Bytes(bytes.into())); Value::Object(fields) } } @@ -229,7 +229,7 @@ mod test { use approx::assert_relative_eq; use quickcheck::quickcheck; - use vector_lib::event::Value; + use vrl::value::{ObjectMap, Value}; use crate::sources::fluent::message::FluentValue; @@ -310,9 +310,9 @@ mod test { let actual_inner: Vec<(rmpv::Value, rmpv::Value)> = input.clone().into_iter().map(|(k,v)| (key_fn(k), val_fn(v))).collect(); let actual = rmpv::Value::Map(actual_inner); - let mut expected_inner = BTreeMap::new(); + let mut expected_inner = ObjectMap::new(); for (k,v) in input.into_iter() { - expected_inner.insert(k, Value::Integer(v)); + expected_inner.insert(k.into(), Value::Integer(v)); } let expected = Value::Object(expected_inner); @@ -346,9 +346,9 @@ mod test { fn from_ext(code: i8, bytes: Vec) -> () { let actual = rmpv::Value::Ext(code, bytes.clone()); - let mut inner = BTreeMap::new(); - inner.insert("msgpack_extension_code".to_string(), Value::Integer(code.into())); - inner.insert("bytes".to_string(), Value::Bytes(bytes.into())); + let mut inner = ObjectMap::new(); + inner.insert("msgpack_extension_code".into(), Value::Integer(code.into())); + inner.insert("bytes".into(), Value::Bytes(bytes.into())); let expected = Value::Object(inner); assert_eq!(Value::from(FluentValue(actual)), expected); diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index 508579284147c..a5d4432565813 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -631,7 +631,6 @@ mod tests { use chrono::{DateTime, Utc}; use rmp_serde::Serializer; use serde::Serialize; - use std::collections::BTreeMap; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, time::{error::Elapsed, timeout, Duration}, @@ -639,8 +638,8 @@ mod tests { use tokio_util::codec::Decoder; use vector_lib::assert_event_data_eq; use vector_lib::lookup::OwnedTargetPath; - use vector_lib::{event::Value, schema::Definition}; - use vrl::value::kind::Collection; + use vector_lib::schema::Definition; + use vrl::value::{kind::Collection, ObjectMap, Value}; use super::{message::FluentMessageOptions, *}; use crate::{ @@ -661,15 +660,15 @@ mod tests { // Decode base64: https://toolslick.com/conversion/data/messagepack-to-json fn mock_event(name: &str, timestamp: &str) -> Event { - Event::Log(LogEvent::from(BTreeMap::from([ - (String::from("message"), Value::from(name)), + Event::Log(LogEvent::from(ObjectMap::from([ + ("message".into(), Value::from(name)), ( - log_schema().source_type_key().unwrap().to_string(), + log_schema().source_type_key().unwrap().to_string().into(), Value::from(FluentConfig::NAME), ), - (String::from("tag"), Value::from("tag.name")), + ("tag".into(), Value::from("tag.name")), ( - String::from("timestamp"), + "timestamp".into(), Value::Timestamp(DateTime::parse_from_rfc3339(timestamp).unwrap().into()), ), ]))) diff --git a/src/sources/gcp_pubsub.rs b/src/sources/gcp_pubsub.rs index 52ca0e50621f3..01412f0afec9d 100644 --- a/src/sources/gcp_pubsub.rs +++ b/src/sources/gcp_pubsub.rs @@ -666,7 +666,7 @@ impl PubsubSource { message .attributes .into_iter() - .map(|(key, value)| (key, Value::Bytes(value.into()))) + .map(|(key, value)| (key.into(), Value::Bytes(value.into()))) .collect(), ); let log_namespace = self.log_namespace; @@ -1171,7 +1171,7 @@ mod integration_tests { .clone(); assert_eq!(logattr.len(), attributes.len()); for (a, b) in logattr.into_iter().zip(&attributes) { - assert_eq!(&a.0, b.0); + assert_eq!(&a.0, b.0.as_str()); assert_eq!(a.1, b.1.clone().into()); } } diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 366ad0fd5bad0..2b37c71fa3f4a 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -477,7 +477,7 @@ impl HttpSource for SimpleHttpSource { #[cfg(test)] mod tests { use std::str::FromStr; - use std::{collections::BTreeMap, io::Write, net::SocketAddr}; + use std::{io::Write, net::SocketAddr}; use flate2::{ write::{GzEncoder, ZlibEncoder}, @@ -486,9 +486,6 @@ mod tests { use futures::Stream; use http::{HeaderMap, Method, StatusCode}; use similar_asserts::assert_eq; - use vrl::value::kind::Collection; - use vrl::value::Kind; - use vector_lib::codecs::{ decoding::{DeserializerConfig, FramingConfig}, BytesDecoderConfig, JsonDeserializerConfig, @@ -498,6 +495,7 @@ mod tests { use vector_lib::lookup::lookup_v2::OptionalValuePath; use vector_lib::lookup::{event_path, owned_value_path, OwnedTargetPath, PathPrefix}; use vector_lib::schema::Definition; + use vrl::value::{kind::Collection, Kind, ObjectMap}; use crate::sources::http_server::HttpMethod; use crate::{ @@ -890,8 +888,8 @@ mod tests { { let event = events.remove(0); let log = event.as_log(); - let mut map = BTreeMap::new(); - map.insert("dotted.key2".to_string(), Value::from("value2")); + let mut map = ObjectMap::new(); + map.insert("dotted.key2".into(), Value::from("value2")); assert_eq!(log["nested"], map.into()); } } diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 89da9b039c14d..564c4dc13c882 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap, HashSet}, + collections::{HashMap, HashSet}, io::Cursor, pin::Pin, sync::{ @@ -48,7 +48,7 @@ use vector_lib::{ config::{LegacyKey, LogNamespace}, EstimatedJsonEncodedSizeOf, }; -use vrl::value::{kind::Collection, Kind}; +use vrl::value::{kind::Collection, Kind, ObjectMap}; use crate::{ codecs::{Decoder, DecodingConfig}, @@ -1041,7 +1041,7 @@ impl Keys { struct ReceivedMessage { timestamp: Option>, key: Value, - headers: BTreeMap, + headers: ObjectMap, topic: String, partition: i32, offset: i64, @@ -1060,12 +1060,12 @@ impl ReceivedMessage { .map(|key| Value::from(Bytes::from(key.to_owned()))) .unwrap_or(Value::Null); - let mut headers_map = BTreeMap::new(); + let mut headers_map = ObjectMap::new(); if let Some(headers) = msg.headers() { for header in headers.iter() { if let Some(value) = header.value { headers_map.insert( - header.key.to_string(), + header.key.into(), Value::from(Bytes::from(value.to_owned())), ); } @@ -1703,8 +1703,8 @@ mod integration_test { assert_eq!(event.as_log()["topic"], topic.clone().into()); assert!(event.as_log().contains("partition")); assert!(event.as_log().contains("offset")); - let mut expected_headers = BTreeMap::new(); - expected_headers.insert(HEADER_KEY.to_string(), Value::from(HEADER_VALUE)); + let mut expected_headers = ObjectMap::new(); + expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE)); assert_eq!(event.as_log()["headers"], Value::from(expected_headers)); } else { let meta = event.as_log().metadata().value(); @@ -1738,8 +1738,8 @@ mod integration_test { assert!(meta.get(path!("kafka", "partition")).unwrap().is_integer(),); assert!(meta.get(path!("kafka", "offset")).unwrap().is_integer(),); - let mut expected_headers = BTreeMap::new(); - expected_headers.insert(HEADER_KEY.to_string(), Value::from(HEADER_VALUE)); + let mut expected_headers = ObjectMap::new(); + expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE)); assert_eq!( meta.get(path!("kafka", "headers")).unwrap(), &Value::from(expected_headers) diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index c130162b2c5cd..7545af923c3f5 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -19,7 +19,7 @@ use vector_lib::{ schema::Definition, }; use vrl::value::kind::Collection; -use vrl::value::Kind; +use vrl::value::{KeyString, Kind}; use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker}; use crate::{ @@ -434,7 +434,7 @@ impl TryFrom for LogstashFrameType { struct LogstashEventFrame { protocol: LogstashProtocolVersion, sequence_number: u32, - fields: BTreeMap, + fields: BTreeMap, } // Based on spec at: https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md @@ -522,7 +522,7 @@ impl Decoder for LogstashDecoder { let sequence_number = rest.get_u32(); let pair_count = rest.get_u32(); - let mut fields: BTreeMap = BTreeMap::new(); + let mut fields = BTreeMap::::new(); for _ in 0..pair_count { if src.remaining() < 4 { return Ok(None); @@ -546,7 +546,7 @@ impl Decoder for LogstashDecoder { rest = right; fields.insert( - String::from_utf8_lossy(key).to_string(), + String::from_utf8_lossy(key).into(), String::from_utf8_lossy(value).into(), ); } @@ -585,7 +585,7 @@ impl Decoder for LogstashDecoder { let (slice, right) = rest.split_at(payload_size); rest = right; - let fields_result: Result, _> = + let fields_result: Result, _> = serde_json::from_slice(slice).context(JsonFrameFailedDecodeSnafu {}); let remaining = rest.remaining(); diff --git a/src/sources/opentelemetry/tests.rs b/src/sources/opentelemetry/tests.rs index 4d381d84582e7..4d943988981fa 100644 --- a/src/sources/opentelemetry/tests.rs +++ b/src/sources/opentelemetry/tests.rs @@ -1,9 +1,9 @@ +use std::sync::Arc; + use chrono::{TimeZone, Utc}; use futures::Stream; use futures_util::StreamExt; use similar_asserts::assert_eq; -use std::collections::BTreeMap; -use std::sync::Arc; use tonic::Request; use vector_lib::config::LogNamespace; use vector_lib::lookup::path; @@ -18,7 +18,7 @@ use vrl::value; use crate::config::OutputId; use crate::{ config::{SourceConfig, SourceContext}, - event::{into_event_stream, Event, EventStatus, LogEvent, Value}, + event::{into_event_stream, Event, EventStatus, LogEvent, ObjectMap, Value}, sources::opentelemetry::{GrpcConfig, HttpConfig, OpentelemetryConfig, LOGS}, test_util::{ self, @@ -300,10 +300,10 @@ fn str_into_hex_bytes(s: &str) -> Vec { hex::decode(s).unwrap() } -fn vec_into_btmap(arr: Vec<(&'static str, Value)>) -> BTreeMap { - BTreeMap::from_iter( +fn vec_into_btmap(arr: Vec<(&'static str, Value)>) -> ObjectMap { + ObjectMap::from_iter( arr.into_iter() - .map(|(k, v)| (k.to_string(), v)) + .map(|(k, v)| (k.into(), v)) .collect::>(), ) } diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 6bc219e636d1f..0ee6cbab3aff0 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -322,7 +322,7 @@ pub(crate) fn default_host_key() -> OptionalValuePath { mod test { use approx::assert_relative_eq; use std::{ - collections::{BTreeMap, HashMap}, + collections::HashMap, net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, @@ -346,9 +346,8 @@ mod test { }; use vector_lib::event::EventContainer; use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path}; - use vrl::btreemap; - use vrl::value; - use vrl::value::Value; + use vrl::value::ObjectMap; + use vrl::{btreemap, value}; #[cfg(unix)] use { @@ -600,7 +599,7 @@ mod test { "one line".into() ); - let tls_meta: BTreeMap = btreemap!( + let tls_meta: ObjectMap = btreemap!( "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US" ); @@ -665,7 +664,7 @@ mod test { assert_eq!(log.value(), &"one line".into()); - let tls_meta: BTreeMap = btreemap!( + let tls_meta: ObjectMap = btreemap!( "subject" => "CN=localhost,OU=Vector,O=Datadog,L=New York,ST=New York,C=US" ); diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index ff33e176c4502..bd58ac4510bf8 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -435,11 +435,7 @@ fn enrich_syslog_event( #[cfg(test)] mod test { - use std::{ - collections::{BTreeMap, HashMap}, - fmt, - str::FromStr, - }; + use std::{collections::HashMap, fmt, str::FromStr}; use vector_lib::lookup::{event_path, owned_value_path, OwnedTargetPath}; use chrono::prelude::*; @@ -451,7 +447,7 @@ mod test { use vector_lib::codecs::decoding::format::Deserializer; use vector_lib::lookup::PathPrefix; use vector_lib::{config::ComponentKey, schema::Definition}; - use vrl::value::{kind::Collection, Kind, Value}; + use vrl::value::{kind::Collection, Kind, ObjectMap, Value}; use super::*; use crate::{ @@ -1428,7 +1424,7 @@ mod test { } } - fn structured_data_from_fields(fields: BTreeMap) -> StructuredData { + fn structured_data_from_fields(fields: ObjectMap) -> StructuredData { let mut structured_data = StructuredData::default(); for (key, value) in fields.into_iter() { @@ -1436,10 +1432,10 @@ mod test { .into_object() .unwrap() .into_iter() - .map(|(k, v)| (k, value_to_string(v))) + .map(|(k, v)| (k.into(), value_to_string(v))) .collect(); - structured_data.insert(key, subfields); + structured_data.insert(key.into(), subfields); } structured_data diff --git a/src/sources/util/net/tcp/mod.rs b/src/sources/util/net/tcp/mod.rs index f68e6302c4ee2..3b8fe135b6081 100644 --- a/src/sources/util/net/tcp/mod.rs +++ b/src/sources/util/net/tcp/mod.rs @@ -1,6 +1,6 @@ mod request_limiter; -use std::{collections::BTreeMap, io, mem::drop, net::SocketAddr, time::Duration}; +use std::{io, mem::drop, net::SocketAddr, time::Duration}; use bytes::Bytes; use futures::{future::BoxFuture, FutureExt, StreamExt}; @@ -22,7 +22,7 @@ use vector_lib::{ config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig}, EstimatedJsonEncodedSizeOf, }; -use vrl::value::Value; +use vrl::value::ObjectMap; use self::request_limiter::RequestLimiter; use super::SocketListenAddr; @@ -364,8 +364,8 @@ async fn handle_stream( if let Some(certificate_metadata) = &certificate_metadata { - let mut metadata: BTreeMap = BTreeMap::new(); - metadata.insert("subject".to_string(), certificate_metadata.subject().into()); + let mut metadata = ObjectMap::new(); + metadata.insert("subject".into(), certificate_metadata.subject().into()); for event in &mut events { let log = event.as_mut_log(); diff --git a/src/transforms/aws_ec2_metadata.rs b/src/transforms/aws_ec2_metadata.rs index cb81b28e35d28..2c4689c2d5321 100644 --- a/src/transforms/aws_ec2_metadata.rs +++ b/src/transforms/aws_ec2_metadata.rs @@ -760,9 +760,8 @@ mod integration_tests { test_util::{components::assert_transform_compliance, next_addr}, transforms::test::create_topology, }; - use std::collections::BTreeMap; use vector_lib::assert_event_data_eq; - use vrl::value::Value; + use vrl::value::{ObjectMap, Value}; use warp::Filter; fn ec2_metadata_address() -> String { @@ -1027,9 +1026,9 @@ mod integration_tests { expected_log.insert(format!("\"{}\"", REGION_KEY).as_str(), "us-east-1"); expected_log.insert( format!("\"{}\"", TAGS_KEY).as_str(), - BTreeMap::from([ - ("Name".to_string(), Value::from("test-instance")), - ("Test".to_string(), Value::from("test-tag")), + ObjectMap::from([ + ("Name".into(), Value::from("test-instance")), + ("Test".into(), Value::from("test-tag")), ]), ); diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index 5ac6918885dc9..722752e74c4ae 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -293,7 +293,7 @@ impl TaskTransform for Dedupe { #[cfg(test)] mod tests { - use std::{collections::BTreeMap, sync::Arc}; + use std::sync::Arc; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -303,7 +303,7 @@ mod tests { use crate::config::schema::Definition; use crate::{ - event::{Event, LogEvent, Value}, + event::{Event, LogEvent, ObjectMap, Value}, test_util::components::assert_transform_compliance, transforms::{ dedupe::{CacheConfig, DedupeConfig, FieldMatchConfig}, @@ -677,12 +677,12 @@ mod tests { let (topology, mut out) = create_topology(ReceiverStream::new(rx), transform_config).await; - let mut map1: BTreeMap = BTreeMap::new(); + let mut map1 = ObjectMap::new(); map1.insert("key".into(), "123".into()); let mut event1 = Event::Log(LogEvent::from("message")); event1.as_mut_log().insert("matched", map1); - let mut map2: BTreeMap = BTreeMap::new(); + let mut map2 = ObjectMap::new(); map2.insert("key".into(), 123.into()); let mut event2 = Event::Log(LogEvent::from("message")); event2.as_mut_log().insert("matched", map2); diff --git a/src/transforms/metric_to_log.rs b/src/transforms/metric_to_log.rs index 8b556b784f2b6..851acbfdee1f5 100644 --- a/src/transforms/metric_to_log.rs +++ b/src/transforms/metric_to_log.rs @@ -359,7 +359,7 @@ mod tests { use super::*; use crate::event::{ metric::{MetricKind, MetricTags, MetricValue, StatisticKind, TagValue, TagValueSet}, - Metric, Value, + KeyString, Metric, Value, }; use crate::test_util::{components::assert_transform_compliance, random_string}; use crate::transforms::test::create_topology; @@ -433,12 +433,12 @@ mod tests { assert_eq!( collected, vec![ - (String::from("counter.value"), &Value::from(1.0)), - (String::from("host"), &Value::from("localhost")), - (String::from("kind"), &Value::from("absolute")), - (String::from("name"), &Value::from("counter")), - (String::from("tags.some_tag"), &Value::from("some_value")), - (String::from("timestamp"), &Value::from(ts())), + (KeyString::from("counter.value"), &Value::from(1.0)), + (KeyString::from("host"), &Value::from("localhost")), + (KeyString::from("kind"), &Value::from("absolute")), + (KeyString::from("name"), &Value::from("counter")), + (KeyString::from("tags.some_tag"), &Value::from("some_value")), + (KeyString::from("timestamp"), &Value::from(ts())), ] ); assert_eq!(log.metadata(), &metadata); @@ -464,10 +464,10 @@ mod tests { assert_eq!( collected, vec![ - (String::from("gauge.value"), &Value::from(1.0)), - (String::from("kind"), &Value::from("absolute")), - (String::from("name"), &Value::from("gauge")), - (String::from("timestamp"), &Value::from(ts())), + (KeyString::from("gauge.value"), &Value::from(1.0)), + (KeyString::from("kind"), &Value::from("absolute")), + (KeyString::from("name"), &Value::from("gauge")), + (KeyString::from("timestamp"), &Value::from(ts())), ] ); assert_eq!(log.metadata(), &metadata); @@ -495,11 +495,11 @@ mod tests { assert_eq!( collected, vec![ - (String::from("kind"), &Value::from("absolute")), - (String::from("name"), &Value::from("set")), - (String::from("set.values[0]"), &Value::from("one")), - (String::from("set.values[1]"), &Value::from("two")), - (String::from("timestamp"), &Value::from(ts())), + (KeyString::from("kind"), &Value::from("absolute")), + (KeyString::from("name"), &Value::from("set")), + (KeyString::from("set.values[0]"), &Value::from("one")), + (KeyString::from("set.values[1]"), &Value::from("two")), + (KeyString::from("timestamp"), &Value::from(ts())), ] ); assert_eq!(log.metadata(), &metadata); @@ -529,28 +529,28 @@ mod tests { collected, vec![ ( - String::from("distribution.samples[0].rate"), + KeyString::from("distribution.samples[0].rate"), &Value::from(10) ), ( - String::from("distribution.samples[0].value"), + KeyString::from("distribution.samples[0].value"), &Value::from(1.0) ), ( - String::from("distribution.samples[1].rate"), + KeyString::from("distribution.samples[1].rate"), &Value::from(20) ), ( - String::from("distribution.samples[1].value"), + KeyString::from("distribution.samples[1].value"), &Value::from(2.0) ), ( - String::from("distribution.statistic"), + KeyString::from("distribution.statistic"), &Value::from("histogram") ), - (String::from("kind"), &Value::from("absolute")), - (String::from("name"), &Value::from("distro")), - (String::from("timestamp"), &Value::from(ts())), + (KeyString::from("kind"), &Value::from("absolute")), + (KeyString::from("name"), &Value::from("distro")), + (KeyString::from("timestamp"), &Value::from(ts())), ] ); assert_eq!(log.metadata(), &metadata); @@ -581,26 +581,32 @@ mod tests { collected, vec![ ( - String::from("aggregated_histogram.buckets[0].count"), + KeyString::from("aggregated_histogram.buckets[0].count"), &Value::from(10) ), ( - String::from("aggregated_histogram.buckets[0].upper_limit"), + KeyString::from("aggregated_histogram.buckets[0].upper_limit"), &Value::from(1.0) ), ( - String::from("aggregated_histogram.buckets[1].count"), + KeyString::from("aggregated_histogram.buckets[1].count"), &Value::from(20) ), ( - String::from("aggregated_histogram.buckets[1].upper_limit"), + KeyString::from("aggregated_histogram.buckets[1].upper_limit"), &Value::from(2.0) ), - (String::from("aggregated_histogram.count"), &Value::from(30)), - (String::from("aggregated_histogram.sum"), &Value::from(50.0)), - (String::from("kind"), &Value::from("absolute")), - (String::from("name"), &Value::from("histo")), - (String::from("timestamp"), &Value::from(ts())), + ( + KeyString::from("aggregated_histogram.count"), + &Value::from(30) + ), + ( + KeyString::from("aggregated_histogram.sum"), + &Value::from(50.0) + ), + (KeyString::from("kind"), &Value::from("absolute")), + (KeyString::from("name"), &Value::from("histo")), + (KeyString::from("timestamp"), &Value::from(ts())), ] ); assert_eq!(log.metadata(), &metadata); @@ -630,27 +636,33 @@ mod tests { assert_eq!( collected, vec![ - (String::from("aggregated_summary.count"), &Value::from(30)), ( - String::from("aggregated_summary.quantiles[0].quantile"), + KeyString::from("aggregated_summary.count"), + &Value::from(30) + ), + ( + KeyString::from("aggregated_summary.quantiles[0].quantile"), &Value::from(50.0) ), ( - String::from("aggregated_summary.quantiles[0].value"), + KeyString::from("aggregated_summary.quantiles[0].value"), &Value::from(10.0) ), ( - String::from("aggregated_summary.quantiles[1].quantile"), + KeyString::from("aggregated_summary.quantiles[1].quantile"), &Value::from(90.0) ), ( - String::from("aggregated_summary.quantiles[1].value"), + KeyString::from("aggregated_summary.quantiles[1].value"), &Value::from(20.0) ), - (String::from("aggregated_summary.sum"), &Value::from(50.0)), - (String::from("kind"), &Value::from("absolute")), - (String::from("name"), &Value::from("summary")), - (String::from("timestamp"), &Value::from(ts())), + ( + KeyString::from("aggregated_summary.sum"), + &Value::from(50.0) + ), + (KeyString::from("kind"), &Value::from("absolute")), + (KeyString::from("name"), &Value::from("summary")), + (KeyString::from("timestamp"), &Value::from(ts())), ] ); assert_eq!(log.metadata(), &metadata); diff --git a/src/transforms/reduce/merge_strategy.rs b/src/transforms/reduce/merge_strategy.rs index 2580636f09cec..37f98d0cf4789 100644 --- a/src/transforms/reduce/merge_strategy.rs +++ b/src/transforms/reduce/merge_strategy.rs @@ -6,7 +6,7 @@ use ordered_float::NotNan; use vector_lib::configurable::configurable_component; use vrl::event_path; -use crate::event::{LogEvent, Value}; +use crate::event::{KeyString, LogEvent, Value}; /// Strategies for merging events. #[configurable_component] @@ -68,7 +68,7 @@ impl ReduceValueMerger for DiscardMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), self.v); Ok(()) } @@ -94,7 +94,7 @@ impl ReduceValueMerger for RetainMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), self.v); Ok(()) } @@ -134,7 +134,7 @@ impl ReduceValueMerger for ConcatMerger { } } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), Value::Bytes(self.v.into())); Ok(()) } @@ -161,7 +161,7 @@ impl ReduceValueMerger for ConcatArrayMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), Value::Array(self.v)); Ok(()) } @@ -184,7 +184,7 @@ impl ReduceValueMerger for ArrayMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), Value::Array(self.v)); Ok(()) } @@ -216,7 +216,7 @@ impl ReduceValueMerger for LongestArrayMerger { } } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), Value::Array(self.v)); Ok(()) } @@ -248,7 +248,7 @@ impl ReduceValueMerger for ShortestArrayMerger { } } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert(event_path!(k.as_str()), Value::Array(self.v)); Ok(()) } @@ -293,7 +293,7 @@ impl ReduceValueMerger for FlatUniqueMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert( event_path!(k.as_str()), Value::Array(self.v.into_iter().collect()), @@ -330,7 +330,7 @@ impl ReduceValueMerger for TimestampWindowMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { v.insert( event_path!(format!("{}_end", k).as_str()), Value::Timestamp(self.latest), @@ -394,7 +394,7 @@ impl ReduceValueMerger for AddNumbersMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(event_path!(k.as_str()), Value::Float(f)), NumberMergerValue::Int(i) => v.insert(event_path!(k.as_str()), Value::Integer(i)), @@ -453,7 +453,7 @@ impl ReduceValueMerger for MaxNumberMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(event_path!(k.as_str()), Value::Float(f)), NumberMergerValue::Int(i) => v.insert(event_path!(k.as_str()), Value::Integer(i)), @@ -512,7 +512,7 @@ impl ReduceValueMerger for MinNumberMerger { Ok(()) } - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String> { + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String> { match self.v { NumberMergerValue::Float(f) => v.insert(event_path!(k.as_str()), Value::Float(f)), NumberMergerValue::Int(i) => v.insert(event_path!(k.as_str()), Value::Integer(i)), @@ -523,7 +523,7 @@ impl ReduceValueMerger for MinNumberMerger { pub trait ReduceValueMerger: std::fmt::Debug + Send + Sync { fn add(&mut self, v: Value) -> Result<(), String>; - fn insert_into(self: Box, k: String, v: &mut LogEvent) -> Result<(), String>; + fn insert_into(self: Box, k: KeyString, v: &mut LogEvent) -> Result<(), String>; } impl From for Box { diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index bf6d4489ed91f..500b9f82e8c61 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -30,7 +30,7 @@ pub use merge_strategy::*; use vector_lib::config::LogNamespace; use vector_lib::stream::expiration_map::{map_with_expiration, Emitter}; use vrl::value::kind::Collection; -use vrl::value::Kind; +use vrl::value::{KeyString, Kind}; /// Configuration for the `reduce` transform. #[serde_as] @@ -91,7 +91,7 @@ pub struct ReduceConfig { #[configurable(metadata( docs::additional_props_description = "An individual merge strategy." ))] - pub merge_strategies: IndexMap, + pub merge_strategies: IndexMap, /// A condition used to distinguish the final event of a transaction. /// @@ -237,7 +237,7 @@ impl TransformConfig for ReduceConfig { #[derive(Debug)] struct ReduceState { events: usize, - fields: HashMap>, + fields: HashMap>, stale_since: Instant, metadata: EventMetadata, } @@ -255,7 +255,7 @@ impl ReduceState { } } - fn add_event(&mut self, e: LogEvent, strategies: &IndexMap) { + fn add_event(&mut self, e: LogEvent, strategies: &IndexMap) { let (value, metadata) = e.into_parts(); self.metadata.merge(metadata); @@ -309,7 +309,7 @@ pub struct Reduce { expire_after: Duration, flush_period: Duration, group_by: Vec, - merge_strategies: IndexMap, + merge_strategies: IndexMap, reduce_merge_states: HashMap, ends_when: Option, starts_when: Option,