From a111ed49bf52e5a2233d179c7816262173efbef0 Mon Sep 17 00:00:00 2001 From: Stephen Wakely Date: Wed, 5 Apr 2023 10:58:48 +0100 Subject: [PATCH] Transform outputs hash table of OutputId -> Definition Signed-off-by: Stephen Wakely --- benches/remap.rs | 6 +- lib/vector-core/src/config/mod.rs | 20 +++- lib/vector-core/src/config/output_id.rs | 90 +++++++++++++++++ src/config/graph.rs | 17 ++-- src/config/id.rs | 89 +---------------- src/config/mod.rs | 4 +- src/config/transform.rs | 8 +- src/test_util/mock/transforms/basic.rs | 2 +- src/test_util/mock/transforms/noop.rs | 2 +- src/topology/schema.rs | 96 ++++++++++++------- src/transforms/aggregate.rs | 2 +- src/transforms/aws_ec2_metadata.rs | 4 +- src/transforms/dedupe.rs | 7 +- src/transforms/filter.rs | 7 +- src/transforms/log_to_metric.rs | 4 +- src/transforms/lua/v1/mod.rs | 12 ++- src/transforms/lua/v2/mod.rs | 12 ++- src/transforms/metric_to_log.rs | 10 +- src/transforms/reduce/mod.rs | 10 +- src/transforms/remap.rs | 45 +++++---- src/transforms/route.rs | 33 +++---- src/transforms/sample.rs | 2 +- .../tag_cardinality_limit/config.rs | 4 +- src/transforms/throttle.rs | 7 +- 24 files changed, 281 insertions(+), 212 deletions(-) create mode 100644 lib/vector-core/src/config/output_id.rs diff --git a/benches/remap.rs b/benches/remap.rs index d3c2c6930a144..3523c541b09ed 100644 --- a/benches/remap.rs +++ b/benches/remap.rs @@ -28,7 +28,7 @@ fn benchmark_remap(c: &mut Criterion) { let add_fields_runner = |tform: &mut Box, event: Event| { let mut outputs = TransformOutputsBuf::new_with_capacity( - vec![TransformOutput::new(DataType::all(), vec![])], + vec![TransformOutput::new(DataType::all(), HashMap::new())], 1, ); tform.transform(event, &mut outputs); @@ -80,7 +80,7 @@ fn benchmark_remap(c: &mut Criterion) { let json_parser_runner = |tform: &mut Box, event: Event| { let mut outputs = TransformOutputsBuf::new_with_capacity( - vec![TransformOutput::new(DataType::all(), vec![])], + vec![TransformOutput::new(DataType::all(), HashMap::new())], 1, ); tform.transform(event, &mut outputs); @@ -134,7 +134,7 @@ fn benchmark_remap(c: &mut Criterion) { let coerce_runner = |tform: &mut Box, event: Event, timestamp: DateTime| { let mut outputs = TransformOutputsBuf::new_with_capacity( - vec![TransformOutput::new(DataType::all(), vec![])], + vec![TransformOutput::new(DataType::all(), HashMap::new())], 1, ); tform.transform(event, &mut outputs); diff --git a/lib/vector-core/src/config/mod.rs b/lib/vector-core/src/config/mod.rs index 2b9e221e17a65..7669643d02eca 100644 --- a/lib/vector-core/src/config/mod.rs +++ b/lib/vector-core/src/config/mod.rs @@ -1,4 +1,4 @@ -use std::{fmt, num::NonZeroUsize}; +use std::{collections::HashMap, fmt, num::NonZeroUsize}; use bitmask_enum::bitmask; use bytes::Bytes; @@ -6,12 +6,14 @@ use chrono::{DateTime, Utc}; mod global_options; mod log_schema; +pub mod output_id; pub mod proxy; use crate::event::LogEvent; pub use global_options::GlobalOptions; pub use log_schema::{init_log_schema, log_schema, LogSchema}; use lookup::{lookup_v2::ValuePath, path, PathPrefix}; +pub use output_id::OutputId; use serde::{Deserialize, Serialize}; use value::Value; pub use vector_common::config::ComponentKey; @@ -199,14 +201,14 @@ pub struct TransformOutput { /// enabled, at least one definition should be output. If the transform /// has multiple connected sources, it is possible to have multiple output /// definitions - one for each input. - pub log_schema_definitions: Vec, + pub log_schema_definitions: HashMap, } impl TransformOutput { /// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s. /// Designed for use in transforms. #[must_use] - pub fn new(ty: DataType, schema_definitions: Vec) -> Self { + pub fn new(ty: DataType, schema_definitions: HashMap) -> Self { Self { port: None, ty, @@ -222,6 +224,18 @@ impl TransformOutput { } } +/// Simple utility function that can be used by transforms that make no changes to +/// the schema definitions of events. +/// Takes a list of definitions with [`OutputId`] returns them as a [`HashMap`]. +pub fn clone_input_definitions( + input_definitions: &[(OutputId, schema::Definition)], +) -> HashMap { + input_definitions + .iter() + .map(|(output, definition)| (output.clone(), definition.clone())) + .collect() +} + /// Source-specific end-to-end acknowledgements configuration. /// /// This type exists solely to provide a source-specific description of the `acknowledgements` diff --git a/lib/vector-core/src/config/output_id.rs b/lib/vector-core/src/config/output_id.rs new file mode 100644 index 0000000000000..81c35279f6db2 --- /dev/null +++ b/lib/vector-core/src/config/output_id.rs @@ -0,0 +1,90 @@ +use std::fmt; + +use vector_common::config::ComponentKey; + +use crate::{config::configurable_component, schema}; + +/// Component output identifier. +#[configurable_component] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +pub struct OutputId { + /// The component to which the output belongs. + pub component: ComponentKey, + + /// The output port name, if not the default. + pub port: Option, +} + +impl OutputId { + /// Some situations, for example when validating a config file requires running the + /// `transforms::output` function to retrieve the outputs, but we don't have an + /// `OutputId` from a source. This gives us an `OutputId` that we can use. + /// + /// TODO: This is not a pleasant solution, but would require some significant refactoring + /// to the topology code to avoid. + pub fn dummy() -> Self { + Self { + component: "dummy".into(), + port: None, + } + } + + /// Given a list of [`schema::Definition`]s, returns a [`Vec`] of tuples of + /// this `OutputId` with each `Definition`. + pub fn with_definitions( + &self, + definitions: impl IntoIterator, + ) -> Vec<(OutputId, schema::Definition)> { + definitions + .into_iter() + .map(|definition| (self.clone(), definition)) + .collect() + } +} + +impl fmt::Display for OutputId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.port { + None => self.component.fmt(f), + Some(port) => write!(f, "{}.{port}", self.component), + } + } +} + +impl From for OutputId { + fn from(key: ComponentKey) -> Self { + Self { + component: key, + port: None, + } + } +} + +impl From<&ComponentKey> for OutputId { + fn from(key: &ComponentKey) -> Self { + Self::from(key.clone()) + } +} + +impl From<(&ComponentKey, String)> for OutputId { + fn from((key, name): (&ComponentKey, String)) -> Self { + Self { + component: key.clone(), + port: Some(name), + } + } +} + +// This panicking implementation is convenient for testing, but should never be enabled for use +// outside of tests. +#[cfg(any(test, feature = "test"))] +impl From<&str> for OutputId { + fn from(s: &str) -> Self { + assert!( + !s.contains('.'), + "Cannot convert dotted paths to strings without more context" + ); + let component = ComponentKey::from(s); + component.into() + } +} diff --git a/src/config/graph.rs b/src/config/graph.rs index 6f4fabbaf4a2b..643f47d885951 100644 --- a/src/config/graph.rs +++ b/src/config/graph.rs @@ -399,7 +399,7 @@ mod test { in_ty, outputs: vec![TransformOutput::new( out_ty, - vec![Definition::default_legacy_namespace()], + [("test".into(), Definition::default_legacy_namespace())].into(), )], }, ); @@ -415,8 +415,11 @@ mod test { let id = id.into(); match self.nodes.get_mut(&id) { Some(Node::Transform { outputs, .. }) => outputs.push( - TransformOutput::new(ty, vec![Definition::default_legacy_namespace()]) - .with_port(name), + TransformOutput::new( + ty, + [("test".into(), Definition::default_legacy_namespace())].into(), + ) + .with_port(name), ), _ => panic!("invalid transform"), } @@ -651,11 +654,11 @@ mod test { outputs: vec![ TransformOutput::new( DataType::all(), - vec![Definition::default_legacy_namespace()], + [("test".into(), Definition::default_legacy_namespace())].into(), ), TransformOutput::new( DataType::all(), - vec![Definition::default_legacy_namespace()], + [("test".into(), Definition::default_legacy_namespace())].into(), ) .with_port("bar"), ], @@ -676,11 +679,11 @@ mod test { outputs: vec![ TransformOutput::new( DataType::all(), - vec![Definition::default_legacy_namespace()], + [("test".into(), Definition::default_legacy_namespace())].into(), ), TransformOutput::new( DataType::all(), - vec![Definition::default_legacy_namespace()], + [("test".into(), Definition::default_legacy_namespace())].into(), ) .with_port("errors"), ], diff --git a/src/config/id.rs b/src/config/id.rs index caa561b68352d..8355fd22f1033 100644 --- a/src/config/id.rs +++ b/src/config/id.rs @@ -1,10 +1,8 @@ -use std::{fmt, ops::Deref}; +use std::ops::Deref; use vector_config::configurable_component; pub use vector_core::config::ComponentKey; -use super::schema; - /// A list of upstream [source][sources] or [transform][transforms] IDs. /// /// Wildcards (`*`) are supported. @@ -96,88 +94,3 @@ impl From> for Inputs { Self(inputs) } } - -/// Component output identifier. -#[configurable_component] -#[derive(Clone, Debug, Eq, Hash, PartialEq)] -pub struct OutputId { - /// The component to which the output belongs. - pub component: ComponentKey, - - /// The output port name, if not the default. - pub port: Option, -} - -impl OutputId { - /// Some situations, for example when validating a config file requires running the - /// transforms::output function to retrieve the outputs, but we don't have an - /// `OutputId` from a source. This gives us an `OutputId` that we can use. - /// - /// TODO: This is not a pleasant solution, but would require some significant refactoring - /// to the topology code to avoid. - pub fn dummy() -> Self { - Self { - component: "dummy".into(), - port: None, - } - } - - /// Given a list of [`schema::Definition`]s, returns a [`Vec`] of tuples of - /// this `OutputId` with each `Definition`. - pub fn with_definitions( - &self, - definitions: impl IntoIterator, - ) -> Vec<(OutputId, schema::Definition)> { - definitions - .into_iter() - .map(|definition| (self.clone(), definition)) - .collect() - } -} - -impl fmt::Display for OutputId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.port { - None => self.component.fmt(f), - Some(port) => write!(f, "{}.{}", self.component, port), - } - } -} - -impl From for OutputId { - fn from(key: ComponentKey) -> Self { - Self { - component: key, - port: None, - } - } -} - -impl From<&ComponentKey> for OutputId { - fn from(key: &ComponentKey) -> Self { - Self::from(key.clone()) - } -} - -impl From<(&ComponentKey, String)> for OutputId { - fn from((key, name): (&ComponentKey, String)) -> Self { - Self { - component: key.clone(), - port: Some(name), - } - } -} - -// This panicking implementation is convenient for testing, but should never be enabled for use -// outside of tests. -#[cfg(test)] -impl From<&str> for OutputId { - fn from(s: &str) -> Self { - assert!( - !s.contains('.'), - "Cannot convert dotted paths to strings without more context" - ); - let component = ComponentKey::from(s); - component.into() - } -} diff --git a/src/config/mod.rs b/src/config/mod.rs index 84d7dace6584e..8b044dcdaeaa3 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -45,7 +45,7 @@ pub use cmd::{cmd, Opts}; pub use diff::ConfigDiff; pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter}; pub use format::{Format, FormatHint}; -pub use id::{ComponentKey, Inputs, OutputId}; +pub use id::{ComponentKey, Inputs}; pub use loading::{ load, load_builder_from_paths, load_from_paths, load_from_paths_with_provider_and_secrets, load_from_str, load_source_from_paths, merge_path_lists, process_paths, CONFIG_PATHS, @@ -57,7 +57,7 @@ pub use source::{SourceConfig, SourceContext, SourceOuter}; pub use transform::{BoxedTransform, TransformConfig, TransformContext, TransformOuter}; pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult}; pub use validation::warnings; -pub use vector_core::config::{log_schema, proxy::ProxyConfig, LogSchema}; +pub use vector_core::config::{log_schema, proxy::ProxyConfig, LogSchema, OutputId}; /// Loads Log Schema from configurations and sets global schema. /// Once this is done, configurations can be correctly loaded using diff --git a/src/config/transform.rs b/src/config/transform.rs index f3271c3422e7b..e2447a70e8ad1 100644 --- a/src/config/transform.rs +++ b/src/config/transform.rs @@ -111,7 +111,7 @@ pub struct TransformContext { /// /// Given a transform can expose multiple [`TransformOutput`] channels, the ID is tied to the identifier of /// that `TransformOutput`. - pub schema_definitions: HashMap, Vec>, + pub schema_definitions: HashMap, HashMap>, /// The schema definition created by merging all inputs of the transform. /// @@ -129,7 +129,7 @@ impl Default for TransformContext { key: Default::default(), globals: Default::default(), enrichment_tables: Default::default(), - schema_definitions: HashMap::from([(None, vec![schema::Definition::any()])]), + schema_definitions: HashMap::from([(None, HashMap::new())]), merged_schema_definition: schema::Definition::any(), schema: SchemaOptions::default(), } @@ -148,7 +148,9 @@ impl TransformContext { } #[cfg(any(test, feature = "test"))] - pub fn new_test(schema_definitions: HashMap, Vec>) -> Self { + pub fn new_test( + schema_definitions: HashMap, HashMap>, + ) -> Self { Self { schema_definitions, ..Default::default() diff --git a/src/test_util/mock/transforms/basic.rs b/src/test_util/mock/transforms/basic.rs index ce8673c408994..90e4484a547bd 100644 --- a/src/test_util/mock/transforms/basic.rs +++ b/src/test_util/mock/transforms/basic.rs @@ -58,7 +58,7 @@ impl TransformConfig for BasicTransformConfig { DataType::all(), definitions .iter() - .map(|(_output, definition)| definition.clone()) + .map(|(output, definition)| (output.clone(), definition.clone())) .collect(), )] } diff --git a/src/test_util/mock/transforms/noop.rs b/src/test_util/mock/transforms/noop.rs index b6712e4eec21f..18aadec304d03 100644 --- a/src/test_util/mock/transforms/noop.rs +++ b/src/test_util/mock/transforms/noop.rs @@ -48,7 +48,7 @@ impl TransformConfig for NoopTransformConfig { DataType::all(), definitions .iter() - .map(|(_output, definition)| definition.clone()) + .map(|(output, definition)| (output.clone(), definition.clone())) .collect(), )] } diff --git a/src/topology/schema.rs b/src/topology/schema.rs index 8fdbeb6f729c7..2f39b5a07bd74 100644 --- a/src/topology/schema.rs +++ b/src/topology/schema.rs @@ -64,7 +64,9 @@ pub fn possible_definitions( &input.port ) }) - .log_schema_definitions, + .log_schema_definitions + .values() + .cloned(), ); definitions.append(&mut transform_definition); @@ -143,7 +145,9 @@ pub(super) fn expanded_definitions( .iter() .find_map(|output| { if output.port == input.port { - Some(input.with_definitions(output.log_schema_definitions.clone())) + Some( + input.with_definitions(output.log_schema_definitions.values().cloned()), + ) } else { None } @@ -216,7 +220,9 @@ pub(crate) fn input_definitions( &input.port ) }) - .log_schema_definitions, + .log_schema_definitions + .values() + .cloned(), ); definitions.append(&mut transform_definitions); @@ -530,11 +536,15 @@ mod tests { vec![OutputId::from("source-foo")], vec![TransformOutput::new( DataType::all(), - vec![Definition::empty_legacy_namespace().with_event_field( - &owned_value_path!("baz"), - Kind::regex(), - Some("baz"), - )], + [( + "source-foo".into(), + Definition::empty_legacy_namespace().with_event_field( + &owned_value_path!("baz"), + Kind::regex(), + Some("baz"), + ), + )] + .into(), )], ), )]), @@ -601,11 +611,15 @@ mod tests { vec![OutputId::from("Source 1")], vec![TransformOutput::new( DataType::all(), - vec![Definition::empty_legacy_namespace().with_event_field( - &owned_value_path!("transform-1"), - Kind::regex(), - None, - )], + [( + "Source 1".into(), + Definition::empty_legacy_namespace().with_event_field( + &owned_value_path!("transform-1"), + Kind::regex(), + None, + ), + )] + .into(), )], ), ), @@ -615,11 +629,15 @@ mod tests { vec![OutputId::from("Source 2")], vec![TransformOutput::new( DataType::all(), - vec![Definition::empty_legacy_namespace().with_event_field( - &owned_value_path!("transform-2"), - Kind::float().or_null(), - Some("transform-2"), - )], + [( + "Source 2".into(), + Definition::empty_legacy_namespace().with_event_field( + &owned_value_path!("transform-2"), + Kind::float().or_null(), + Some("transform-2"), + ), + )] + .into(), )], ), ), @@ -629,11 +647,15 @@ mod tests { vec![OutputId::from("Source 2")], vec![TransformOutput::new( DataType::all(), - vec![Definition::empty_legacy_namespace().with_event_field( - &owned_value_path!("transform-3"), - Kind::integer(), - Some("transform-3"), - )], + [( + "Source 2".into(), + Definition::empty_legacy_namespace().with_event_field( + &owned_value_path!("transform-3"), + Kind::integer(), + Some("transform-3"), + ), + )] + .into(), )], ), ), @@ -643,11 +665,15 @@ mod tests { vec![OutputId::from("Source 2")], vec![TransformOutput::new( DataType::all(), - vec![Definition::empty_legacy_namespace().with_event_field( - &owned_value_path!("transform-4"), - Kind::timestamp().or_bytes(), - Some("transform-4"), - )], + [( + "Source 2".into(), + Definition::empty_legacy_namespace().with_event_field( + &owned_value_path!("transform-4"), + Kind::timestamp().or_bytes(), + Some("transform-4"), + ), + )] + .into(), )], ), ), @@ -657,11 +683,15 @@ mod tests { vec![OutputId::from("Transform 3"), OutputId::from("Transform 4")], vec![TransformOutput::new( DataType::all(), - vec![Definition::empty_legacy_namespace().with_event_field( - &owned_value_path!("transform-5"), - Kind::boolean(), - Some("transform-5"), - )], + [( + "Transform 3".into(), + Definition::empty_legacy_namespace().with_event_field( + &owned_value_path!("transform-5"), + Kind::boolean(), + Some("transform-5"), + ), + )] + .into(), )], ), ), diff --git a/src/transforms/aggregate.rs b/src/transforms/aggregate.rs index fdeb73eaa00a8..95212582601e0 100644 --- a/src/transforms/aggregate.rs +++ b/src/transforms/aggregate.rs @@ -51,7 +51,7 @@ impl TransformConfig for AggregateConfig { _: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { - vec![TransformOutput::new(DataType::Metric, vec![])] + vec![TransformOutput::new(DataType::Metric, HashMap::new())] } } diff --git a/src/transforms/aws_ec2_metadata.rs b/src/transforms/aws_ec2_metadata.rs index 560e82ea917be..9a66140de50fe 100644 --- a/src/transforms/aws_ec2_metadata.rs +++ b/src/transforms/aws_ec2_metadata.rs @@ -270,7 +270,7 @@ impl TransformConfig for Ec2Metadata { let schema_definition = input_definitions .iter() - .map(|(_output, definition)| { + .map(|(output, definition)| { let mut schema_definition = definition.clone(); for path in paths { @@ -278,7 +278,7 @@ impl TransformConfig for Ec2Metadata { schema_definition.with_field(path, Kind::bytes().or_undefined(), None); } - schema_definition + (output.clone(), schema_definition) }) .collect(); diff --git a/src/transforms/dedupe.rs b/src/transforms/dedupe.rs index c0fe99f0204cd..c62f40b31a339 100644 --- a/src/transforms/dedupe.rs +++ b/src/transforms/dedupe.rs @@ -4,7 +4,7 @@ use bytes::Bytes; use futures::{Stream, StreamExt}; use lru::LruCache; use vector_config::configurable_component; -use vector_core::config::LogNamespace; +use vector_core::config::{clone_input_definitions, LogNamespace}; use crate::{ config::{ @@ -160,10 +160,7 @@ impl TransformConfig for DedupeConfig { ) -> Vec { vec![TransformOutput::new( DataType::Log, - input_definitions - .iter() - .map(|(_output, definition)| definition.clone()) - .collect(), + clone_input_definitions(input_definitions), )] } } diff --git a/src/transforms/filter.rs b/src/transforms/filter.rs index f0bed3c180bc6..9351c1d3c724b 100644 --- a/src/transforms/filter.rs +++ b/src/transforms/filter.rs @@ -1,6 +1,6 @@ use vector_common::internal_event::{Count, InternalEventHandle as _, Registered}; use vector_config::configurable_component; -use vector_core::config::LogNamespace; +use vector_core::config::{clone_input_definitions, LogNamespace}; use crate::{ conditions::{AnyCondition, Condition}, @@ -58,10 +58,7 @@ impl TransformConfig for FilterConfig { ) -> Vec { vec![TransformOutput::new( DataType::all(), - input_definitions - .iter() - .map(|(_output, definition)| definition.clone()) - .collect(), + clone_input_definitions(input_definitions), )] } diff --git a/src/transforms/log_to_metric.rs b/src/transforms/log_to_metric.rs index 1b0c7e2369049..16244eb92d28b 100644 --- a/src/transforms/log_to_metric.rs +++ b/src/transforms/log_to_metric.rs @@ -1,4 +1,4 @@ -use std::num::ParseFloatError; +use std::{collections::HashMap, num::ParseFloatError}; use chrono::Utc; use indexmap::IndexMap; @@ -163,7 +163,7 @@ impl TransformConfig for LogToMetricConfig { _: LogNamespace, ) -> Vec { // Converting the log to a metric means we lose all incoming `Definition`s. - vec![TransformOutput::new(DataType::Metric, Vec::new())] + vec![TransformOutput::new(DataType::Metric, HashMap::new())] } fn enable_concurrency(&self) -> bool { diff --git a/src/transforms/lua/v1/mod.rs b/src/transforms/lua/v1/mod.rs index efab62a686dcd..4aab930ede76b 100644 --- a/src/transforms/lua/v1/mod.rs +++ b/src/transforms/lua/v1/mod.rs @@ -58,9 +58,17 @@ impl LuaConfig { .flat_map(|(_output, definition)| definition.log_namespaces().clone()) .collect(); - let definition = Definition::default_for_namespace(&namespaces); + let definition = input_definitions + .iter() + .map(|(output, _definition)| { + ( + output.clone(), + Definition::default_for_namespace(&namespaces), + ) + }) + .collect(); - vec![TransformOutput::new(DataType::Log, vec![definition])] + vec![TransformOutput::new(DataType::Log, definition)] } } diff --git a/src/transforms/lua/v2/mod.rs b/src/transforms/lua/v2/mod.rs index 88bf5fd2086f4..caa4f7d92a775 100644 --- a/src/transforms/lua/v2/mod.rs +++ b/src/transforms/lua/v2/mod.rs @@ -188,11 +188,19 @@ impl LuaConfig { .flat_map(|(_output, definition)| definition.log_namespaces().clone()) .collect(); - let definition = Definition::default_for_namespace(&namespaces); + let definition = input_definitions + .iter() + .map(|(output, _definition)| { + ( + output.clone(), + Definition::default_for_namespace(&namespaces), + ) + }) + .collect(); vec![TransformOutput::new( DataType::Metric | DataType::Log, - vec![definition], + definition, )] } } diff --git a/src/transforms/metric_to_log.rs b/src/transforms/metric_to_log.rs index 1cbf987f947af..687c166dba9df 100644 --- a/src/transforms/metric_to_log.rs +++ b/src/transforms/metric_to_log.rs @@ -94,7 +94,7 @@ impl TransformConfig for MetricToLogConfig { fn outputs( &self, - _: &[(OutputId, Definition)], + input_definitions: &[(OutputId, Definition)], global_log_namespace: LogNamespace, ) -> Vec { let log_namespace = global_log_namespace.merge(self.log_namespace); @@ -229,7 +229,13 @@ impl TransformConfig for MetricToLogConfig { } } - vec![TransformOutput::new(DataType::Log, vec![schema_definition])] + vec![TransformOutput::new( + DataType::Log, + input_definitions + .iter() + .map(|(output, _)| (output.clone(), schema_definition.clone())) + .collect(), + )] } fn enable_concurrency(&self) -> bool { diff --git a/src/transforms/reduce/mod.rs b/src/transforms/reduce/mod.rs index 53230c348a508..0e8d76f38c3e1 100644 --- a/src/transforms/reduce/mod.rs +++ b/src/transforms/reduce/mod.rs @@ -131,9 +131,9 @@ impl TransformConfig for ReduceConfig { input_definitions: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { - let mut output_definitions = Vec::new(); + let mut output_definitions = HashMap::new(); - for (_output, input) in input_definitions { + for (output, input) in input_definitions { let mut schema_definition = input.clone(); for (key, merge_strategy) in self.merge_strategies.iter() { @@ -218,7 +218,7 @@ impl TransformConfig for ReduceConfig { schema_definition = schema_definition.with_field(&key, new_kind, None); } - output_definitions.push(schema_definition); + output_definitions.insert(output.clone(), schema_definition); } vec![TransformOutput::new(DataType::Log, output_definitions)] @@ -563,7 +563,7 @@ group_by = [ "request_id" ] assert_eq!(output_1["counter"], Value::from(8)); assert_eq!(output_1.metadata(), &metadata_1); schema_definitions - .iter() + .values() .for_each(|definition| definition.assert_valid_for_event(&output_1.clone().into())); let output_2 = out.recv().await.unwrap().into_log(); @@ -572,7 +572,7 @@ group_by = [ "request_id" ] assert_eq!(output_2["counter"], Value::from(7)); assert_eq!(output_2.metadata(), &metadata_2); schema_definitions - .iter() + .values() .for_each(|definition| definition.assert_valid_for_event(&output_2.clone().into())); drop(tx); diff --git a/src/transforms/remap.rs b/src/transforms/remap.rs index ab13c59b51db7..5ad070cb7d2b0 100644 --- a/src/transforms/remap.rs +++ b/src/transforms/remap.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use std::{ collections::BTreeMap, @@ -251,10 +252,10 @@ impl TransformConfig for RemapConfig { }) .map_err(|_| ()); - let mut dropped_definitions = Vec::new(); - let mut default_definitions = Vec::new(); + let mut dropped_definitions = HashMap::new(); + let mut default_definitions = HashMap::new(); - for (_output_id, input_definition) in input_definitions { + for (output_id, input_definition) in input_definitions { let default_definition = compiled .clone() .map(|(state, meaning)| { @@ -338,8 +339,8 @@ impl TransformConfig for RemapConfig { ); } - default_definitions.push(default_definition); - dropped_definitions.push(dropped_definition); + default_definitions.insert(output_id.clone(), default_definition); + dropped_definitions.insert(output_id.clone(), dropped_definition); } let default_output = TransformOutput::new(DataType::all(), default_definitions); @@ -445,8 +446,9 @@ where // TODO we can now have multiple possible definitions. // This is going to need to be updated to store these possible definitions and then // choose the correct one based on the input the event has come from. - .get(0) - .cloned() + .iter() + .map(|(_output, definition)| definition.clone()) + .next() .unwrap_or_else(Definition::any); let dropped_schema_definition = context @@ -454,8 +456,9 @@ where .get(&Some(DROPPED.to_owned())) .or_else(|| context.schema_definitions.get(&None)) .expect("dropped schema required") - .get(0) - .cloned() + .iter() + .map(|(_output, definition)| definition.clone()) + .next() .unwrap_or_else(Definition::any); Ok(Remap { @@ -702,10 +705,13 @@ mod tests { fn remap(config: RemapConfig) -> Result> { let schema_definitions = HashMap::from([ - (None, vec![test_default_schema_definition()]), + ( + None, + [("source".into(), test_default_schema_definition())].into(), + ), ( Some(DROPPED.to_owned()), - vec![test_dropped_schema_definition()], + [("source".into(), test_dropped_schema_definition())].into(), ), ]); @@ -1176,10 +1182,13 @@ mod tests { ..Default::default() }; let schema_definitions = HashMap::from([ - (None, vec![test_default_schema_definition()]), + ( + None, + [("source".into(), test_default_schema_definition())].into(), + ), ( Some(DROPPED.to_owned()), - vec![test_dropped_schema_definition()], + [("source".into(), test_dropped_schema_definition())].into(), ), ]); let context = TransformContext { @@ -1448,7 +1457,7 @@ mod tests { ), vec![TransformOutput::new( DataType::all(), - vec![schema_definition] + [("test".into(), schema_definition)].into() )] ); @@ -1514,8 +1523,8 @@ mod tests { fn collect_outputs(ft: &mut dyn SyncTransform, event: Event) -> CollectedOuput { let mut outputs = TransformOutputsBuf::new_with_capacity( vec![ - TransformOutput::new(DataType::all(), vec![]), - TransformOutput::new(DataType::all(), vec![]).with_port(DROPPED), + TransformOutput::new(DataType::all(), HashMap::new()), + TransformOutput::new(DataType::all(), HashMap::new()).with_port(DROPPED), ], 1, ); @@ -1541,8 +1550,8 @@ mod tests { ) -> std::result::Result { let mut outputs = TransformOutputsBuf::new_with_capacity( vec![ - TransformOutput::new(DataType::all(), vec![]), - TransformOutput::new(DataType::all(), vec![]).with_port(DROPPED), + TransformOutput::new(DataType::all(), HashMap::new()), + TransformOutput::new(DataType::all(), HashMap::new()).with_port(DROPPED), ], 1, ); diff --git a/src/transforms/route.rs b/src/transforms/route.rs index 1a456daff6b3d..971d678ffe170 100644 --- a/src/transforms/route.rs +++ b/src/transforms/route.rs @@ -1,6 +1,6 @@ use indexmap::IndexMap; use vector_config::configurable_component; -use vector_core::config::LogNamespace; +use vector_core::config::{clone_input_definitions, LogNamespace}; use vector_core::transform::SyncTransform; use crate::{ @@ -113,25 +113,13 @@ impl TransformConfig for RouteConfig { .route .keys() .map(|output_name| { - TransformOutput::new( - DataType::all(), - input_definitions - .iter() - .map(|(_output, definition)| definition.clone()) - .collect(), - ) - .with_port(output_name) + TransformOutput::new(DataType::all(), clone_input_definitions(input_definitions)) + .with_port(output_name) }) .collect(); result.push( - TransformOutput::new( - DataType::all(), - input_definitions - .iter() - .map(|(_output, definition)| definition.clone()) - .collect(), - ) - .with_port(UNMATCHED_ROUTE), + TransformOutput::new(DataType::all(), clone_input_definitions(input_definitions)) + .with_port(UNMATCHED_ROUTE), ); result } @@ -143,6 +131,8 @@ impl TransformConfig for RouteConfig { #[cfg(test)] mod test { + use std::collections::HashMap; + use indoc::indoc; use vector_core::transform::TransformOutputsBuf; @@ -201,7 +191,8 @@ mod test { output_names .iter() .map(|output_name| { - TransformOutput::new(DataType::all(), vec![]).with_port(output_name.to_owned()) + TransformOutput::new(DataType::all(), HashMap::new()) + .with_port(output_name.to_owned()) }) .collect(), 1, @@ -242,7 +233,8 @@ mod test { output_names .iter() .map(|output_name| { - TransformOutput::new(DataType::all(), vec![]).with_port(output_name.to_owned()) + TransformOutput::new(DataType::all(), HashMap::new()) + .with_port(output_name.to_owned()) }) .collect(), 1, @@ -282,7 +274,8 @@ mod test { output_names .iter() .map(|output_name| { - TransformOutput::new(DataType::all(), vec![]).with_port(output_name.to_owned()) + TransformOutput::new(DataType::all(), HashMap::new()) + .with_port(output_name.to_owned()) }) .collect(), 1, diff --git a/src/transforms/sample.rs b/src/transforms/sample.rs index 40e16e2e653d9..eec1a2652c4ac 100644 --- a/src/transforms/sample.rs +++ b/src/transforms/sample.rs @@ -78,7 +78,7 @@ impl TransformConfig for SampleConfig { DataType::Log | DataType::Trace, input_definitions .iter() - .map(|(_output, definition)| definition.clone()) + .map(|(output, definition)| (output.clone(), definition.clone())) .collect(), )] } diff --git a/src/transforms/tag_cardinality_limit/config.rs b/src/transforms/tag_cardinality_limit/config.rs index 6a83d70d98ff3..8eca913f8c416 100644 --- a/src/transforms/tag_cardinality_limit/config.rs +++ b/src/transforms/tag_cardinality_limit/config.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use crate::config::{ DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext, TransformOutput, }; @@ -115,6 +117,6 @@ impl TransformConfig for TagCardinalityLimitConfig { _: &[(OutputId, schema::Definition)], _: LogNamespace, ) -> Vec { - vec![TransformOutput::new(DataType::Metric, vec![])] + vec![TransformOutput::new(DataType::Metric, HashMap::new())] } } diff --git a/src/transforms/throttle.rs b/src/transforms/throttle.rs index bdc61383db18b..4b97a40410e0b 100644 --- a/src/transforms/throttle.rs +++ b/src/transforms/throttle.rs @@ -6,7 +6,7 @@ use governor::{clock, Quota, RateLimiter}; use serde_with::serde_as; use snafu::Snafu; use vector_config::configurable_component; -use vector_core::config::LogNamespace; +use vector_core::config::{clone_input_definitions, LogNamespace}; use crate::{ conditions::{AnyCondition, Condition}, @@ -67,10 +67,7 @@ impl TransformConfig for ThrottleConfig { // The event is not modified, so the definition is passed through as-is vec![TransformOutput::new( DataType::Log, - input_definitions - .iter() - .map(|(_output, definition)| definition.clone()) - .collect(), + clone_input_definitions(input_definitions), )] } }