Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benches/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn benchmark_remap(c: &mut Criterion) {

let add_fields_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), vec![])],
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
Expand Down Expand Up @@ -80,7 +80,7 @@ fn benchmark_remap(c: &mut Criterion) {

let json_parser_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), vec![])],
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
Expand Down Expand Up @@ -134,7 +134,7 @@ fn benchmark_remap(c: &mut Criterion) {
let coerce_runner =
|tform: &mut Box<dyn SyncTransform>, event: Event, timestamp: DateTime<Utc>| {
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), vec![])],
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
Expand Down
20 changes: 17 additions & 3 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::{fmt, num::NonZeroUsize};
use std::{collections::HashMap, fmt, num::NonZeroUsize};

use bitmask_enum::bitmask;
use bytes::Bytes;
use chrono::{DateTime, Utc};

mod global_options;
mod log_schema;
pub mod output_id;
pub mod proxy;

use crate::event::LogEvent;
pub use global_options::GlobalOptions;
pub use log_schema::{init_log_schema, log_schema, LogSchema};
use lookup::{lookup_v2::ValuePath, path, PathPrefix};
pub use output_id::OutputId;
use serde::{Deserialize, Serialize};
use value::Value;
pub use vector_common::config::ComponentKey;
Expand Down Expand Up @@ -199,14 +201,14 @@ pub struct TransformOutput {
/// enabled, at least one definition should be output. If the transform
/// has multiple connected sources, it is possible to have multiple output
/// definitions - one for each input.
pub log_schema_definitions: Vec<schema::Definition>,
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
}

impl TransformOutput {
/// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s.
/// Designed for use in transforms.
#[must_use]
pub fn new(ty: DataType, schema_definitions: Vec<schema::Definition>) -> Self {
pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
Self {
port: None,
ty,
Expand All @@ -222,6 +224,18 @@ impl TransformOutput {
}
}

/// Simple utility function that can be used by transforms that make no changes to
/// the schema definitions of events.
/// Takes a list of definitions with [`OutputId`] returns them as a [`HashMap`].
pub fn clone_input_definitions(
input_definitions: &[(OutputId, schema::Definition)],
) -> HashMap<OutputId, schema::Definition> {
input_definitions
.iter()
.map(|(output, definition)| (output.clone(), definition.clone()))
.collect()
}

/// Source-specific end-to-end acknowledgements configuration.
///
/// This type exists solely to provide a source-specific description of the `acknowledgements`
Expand Down
90 changes: 90 additions & 0 deletions lib/vector-core/src/config/output_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::fmt;

use vector_common::config::ComponentKey;

use crate::{config::configurable_component, schema};

/// Component output identifier.
#[configurable_component]
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct OutputId {
/// The component to which the output belongs.
pub component: ComponentKey,

/// The output port name, if not the default.
pub port: Option<String>,
}

impl OutputId {
/// Some situations, for example when validating a config file requires running the
/// `transforms::output` function to retrieve the outputs, but we don't have an
/// `OutputId` from a source. This gives us an `OutputId` that we can use.
///
/// TODO: This is not a pleasant solution, but would require some significant refactoring
/// to the topology code to avoid.
pub fn dummy() -> Self {
Self {
component: "dummy".into(),
port: None,
}
}

/// Given a list of [`schema::Definition`]s, returns a [`Vec`] of tuples of
/// this `OutputId` with each `Definition`.
pub fn with_definitions(
&self,
definitions: impl IntoIterator<Item = schema::Definition>,
) -> Vec<(OutputId, schema::Definition)> {
definitions
.into_iter()
.map(|definition| (self.clone(), definition))
.collect()
}
}

impl fmt::Display for OutputId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.port {
None => self.component.fmt(f),
Some(port) => write!(f, "{}.{port}", self.component),
}
}
}

impl From<ComponentKey> for OutputId {
fn from(key: ComponentKey) -> Self {
Self {
component: key,
port: None,
}
}
}

impl From<&ComponentKey> for OutputId {
fn from(key: &ComponentKey) -> Self {
Self::from(key.clone())
}
}

impl From<(&ComponentKey, String)> for OutputId {
fn from((key, name): (&ComponentKey, String)) -> Self {
Self {
component: key.clone(),
port: Some(name),
}
}
}

// This panicking implementation is convenient for testing, but should never be enabled for use
// outside of tests.
#[cfg(any(test, feature = "test"))]
impl From<&str> for OutputId {
fn from(s: &str) -> Self {
assert!(
!s.contains('.'),
"Cannot convert dotted paths to strings without more context"
);
let component = ComponentKey::from(s);
component.into()
}
}
17 changes: 10 additions & 7 deletions src/config/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ mod test {
in_ty,
outputs: vec![TransformOutput::new(
out_ty,
vec![Definition::default_legacy_namespace()],
[("test".into(), Definition::default_legacy_namespace())].into(),
)],
},
);
Expand All @@ -415,8 +415,11 @@ mod test {
let id = id.into();
match self.nodes.get_mut(&id) {
Some(Node::Transform { outputs, .. }) => outputs.push(
TransformOutput::new(ty, vec![Definition::default_legacy_namespace()])
.with_port(name),
TransformOutput::new(
ty,
[("test".into(), Definition::default_legacy_namespace())].into(),
)
.with_port(name),
),
_ => panic!("invalid transform"),
}
Expand Down Expand Up @@ -651,11 +654,11 @@ mod test {
outputs: vec![
TransformOutput::new(
DataType::all(),
vec![Definition::default_legacy_namespace()],
[("test".into(), Definition::default_legacy_namespace())].into(),
),
TransformOutput::new(
DataType::all(),
vec![Definition::default_legacy_namespace()],
[("test".into(), Definition::default_legacy_namespace())].into(),
)
.with_port("bar"),
],
Expand All @@ -676,11 +679,11 @@ mod test {
outputs: vec![
TransformOutput::new(
DataType::all(),
vec![Definition::default_legacy_namespace()],
[("test".into(), Definition::default_legacy_namespace())].into(),
),
TransformOutput::new(
DataType::all(),
vec![Definition::default_legacy_namespace()],
[("test".into(), Definition::default_legacy_namespace())].into(),
)
.with_port("errors"),
],
Expand Down
89 changes: 1 addition & 88 deletions src/config/id.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::{fmt, ops::Deref};
use std::ops::Deref;

use vector_config::configurable_component;
pub use vector_core::config::ComponentKey;

use super::schema;

/// A list of upstream [source][sources] or [transform][transforms] IDs.
///
/// Wildcards (`*`) are supported.
Expand Down Expand Up @@ -96,88 +94,3 @@ impl<T> From<Vec<T>> for Inputs<T> {
Self(inputs)
}
}

/// Component output identifier.
#[configurable_component]
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct OutputId {
/// The component to which the output belongs.
pub component: ComponentKey,

/// The output port name, if not the default.
pub port: Option<String>,
}

impl OutputId {
/// Some situations, for example when validating a config file requires running the
/// transforms::output function to retrieve the outputs, but we don't have an
/// `OutputId` from a source. This gives us an `OutputId` that we can use.
///
/// TODO: This is not a pleasant solution, but would require some significant refactoring
/// to the topology code to avoid.
pub fn dummy() -> Self {
Self {
component: "dummy".into(),
port: None,
}
}

/// Given a list of [`schema::Definition`]s, returns a [`Vec`] of tuples of
/// this `OutputId` with each `Definition`.
pub fn with_definitions(
&self,
definitions: impl IntoIterator<Item = schema::Definition>,
) -> Vec<(OutputId, schema::Definition)> {
definitions
.into_iter()
.map(|definition| (self.clone(), definition))
.collect()
}
}

impl fmt::Display for OutputId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.port {
None => self.component.fmt(f),
Some(port) => write!(f, "{}.{}", self.component, port),
}
}
}

impl From<ComponentKey> for OutputId {
fn from(key: ComponentKey) -> Self {
Self {
component: key,
port: None,
}
}
}

impl From<&ComponentKey> for OutputId {
fn from(key: &ComponentKey) -> Self {
Self::from(key.clone())
}
}

impl From<(&ComponentKey, String)> for OutputId {
fn from((key, name): (&ComponentKey, String)) -> Self {
Self {
component: key.clone(),
port: Some(name),
}
}
}

// This panicking implementation is convenient for testing, but should never be enabled for use
// outside of tests.
#[cfg(test)]
impl From<&str> for OutputId {
fn from(s: &str) -> Self {
assert!(
!s.contains('.'),
"Cannot convert dotted paths to strings without more context"
);
let component = ComponentKey::from(s);
component.into()
}
}
6 changes: 4 additions & 2 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub use cmd::{cmd, Opts};
pub use diff::ConfigDiff;
pub use enrichment_table::{EnrichmentTableConfig, EnrichmentTableOuter};
pub use format::{Format, FormatHint};
pub use id::{ComponentKey, Inputs, OutputId};
pub use id::{ComponentKey, Inputs};
pub use loading::{
load, load_builder_from_paths, load_from_paths, load_from_paths_with_provider_and_secrets,
load_from_str, load_source_from_paths, merge_path_lists, process_paths, CONFIG_PATHS,
Expand All @@ -57,7 +57,9 @@ pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
pub use transform::{BoxedTransform, TransformConfig, TransformContext, TransformOuter};
pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult};
pub use validation::warnings;
pub use vector_core::config::{init_log_schema, log_schema, proxy::ProxyConfig, LogSchema};
pub use vector_core::config::{
init_log_schema, log_schema, proxy::ProxyConfig, LogSchema, OutputId,
};

#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
pub enum ConfigPath {
Expand Down
8 changes: 5 additions & 3 deletions src/config/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub struct TransformContext {
///
/// Given a transform can expose multiple [`TransformOutput`] channels, the ID is tied to the identifier of
/// that `TransformOutput`.
pub schema_definitions: HashMap<Option<String>, Vec<schema::Definition>>,
pub schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,

/// The schema definition created by merging all inputs of the transform.
///
Expand All @@ -129,7 +129,7 @@ impl Default for TransformContext {
key: Default::default(),
globals: Default::default(),
enrichment_tables: Default::default(),
schema_definitions: HashMap::from([(None, vec![schema::Definition::any()])]),
schema_definitions: HashMap::from([(None, HashMap::new())]),
merged_schema_definition: schema::Definition::any(),
schema: SchemaOptions::default(),
}
Expand All @@ -148,7 +148,9 @@ impl TransformContext {
}

#[cfg(any(test, feature = "test"))]
pub fn new_test(schema_definitions: HashMap<Option<String>, Vec<schema::Definition>>) -> Self {
pub fn new_test(
schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
) -> Self {
Self {
schema_definitions,
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion src/test_util/mock/transforms/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl TransformConfig for BasicTransformConfig {
DataType::all(),
definitions
.iter()
.map(|(_output, definition)| definition.clone())
.map(|(output, definition)| (output.clone(), definition.clone()))
.collect(),
)]
}
Expand Down
2 changes: 1 addition & 1 deletion src/test_util/mock/transforms/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl TransformConfig for NoopTransformConfig {
DataType::all(),
definitions
.iter()
.map(|(_output, definition)| definition.clone())
.map(|(output, definition)| (output.clone(), definition.clone()))
.collect(),
)]
}
Expand Down
Loading