Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3ba9721
Dont panic with non object field kinds
StephenWakely Apr 13, 2023
0705a76
Check remap input definition is never
StephenWakely Apr 13, 2023
4123f82
Spelling
StephenWakely Apr 13, 2023
0e86124
Add enrichment tables to the transform outputs
StephenWakely Apr 17, 2023
ef60749
Handle remap array outputs
StephenWakely Apr 18, 2023
8ed6528
Return the panics
StephenWakely Apr 19, 2023
55ddb8d
Revert "Return the panics"
StephenWakely Apr 19, 2023
bd50d03
Return the panics
StephenWakely Apr 19, 2023
516cb30
Added multiple transform tests
StephenWakely Apr 19, 2023
4ad079f
Mild spacing
StephenWakely Apr 19, 2023
1e7c02b
Mild spacing
StephenWakely Apr 20, 2023
a0fd678
Add transport_output_ids to wrap calls to output that don't need defi…
StephenWakely Apr 20, 2023
5e46f19
Test even a VRL error results in the correct ports
StephenWakely Apr 20, 2023
5ec3cfd
Test a mix of array and non array results
StephenWakely Apr 21, 2023
84db500
Error if a returned definition contains never
StephenWakely Apr 24, 2023
252f3a7
Remove never check in transform, the framework should handle it
StephenWakely Apr 24, 2023
68defbf
Feedback from Kyle
StephenWakely Apr 25, 2023
5d7a8b2
Feedback from Kyle
StephenWakely Apr 25, 2023
8eaaa85
Feedback from Spencer
StephenWakely Apr 26, 2023
2e8ac39
Spelling
StephenWakely Apr 26, 2023
8214abc
Feedback from Nathan
StephenWakely Apr 27, 2023
3e0bb2b
Whitespace
StephenWakely Apr 27, 2023
f472c07
Only use transform definition fields when schema is enabled
StephenWakely Apr 27, 2023
925f413
Fixed syntax error
StephenWakely Apr 27, 2023
dced80a
Clippy
StephenWakely Apr 27, 2023
569994a
Fixed test. Non object top level fields need removing
StephenWakely Apr 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub struct TransformOutput {
/// enabled, at least one definition should be output. If the transform
/// has multiple connected sources, it is possible to have multiple output
/// definitions - one for each input.
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
log_schema_definitions: HashMap<OutputId, schema::Definition>,
}

impl TransformOutput {
Expand All @@ -222,6 +222,37 @@ impl TransformOutput {
self.port = Some(name.into());
self
}

/// Return the schema [`schema::Definition`] from this output.
///
/// Takes a `schema_enabled` flag to determine if the full definition including the fields
/// and associated types should be returned, or if a simple definition should be returned.
/// A simple definition is just the default for the namespace. For the Vector namespace the
/// meanings are included.
/// Schema enabled is set in the users configuration.
#[must_use]
pub fn schema_definitions(
&self,
schema_enabled: bool,
) -> HashMap<OutputId, schema::Definition> {
if schema_enabled {
self.log_schema_definitions.clone()
} else {
self.log_schema_definitions
.iter()
.map(|(output, definition)| {
let mut new_definition =
schema::Definition::default_for_namespace(definition.log_namespaces());

if definition.log_namespaces().contains(&LogNamespace::Vector) {
new_definition.add_meanings(definition.meanings());
}

(output.clone(), new_definition)
})
.collect()
}
}
}

/// Simple utility function that can be used by transforms that make no changes to
Expand Down
20 changes: 8 additions & 12 deletions src/api/schema/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt};
use vector_config::NamedComponent;
use vector_core::internal_event::DEFAULT_OUTPUT;

use crate::topology::schema::possible_definitions;
use crate::{
api::schema::{
components::state::component_by_component_key,
filter::{self, filter_items},
relay, sort,
},
config::{ComponentKey, Config},
config::{get_transform_output_ids, ComponentKey, Config},
filter_check,
};

Expand Down Expand Up @@ -254,7 +253,6 @@ impl ComponentsSubscription {

/// Update the 'global' configuration that will be consumed by component queries
pub fn update_config(config: &Config) {
let mut cache = HashMap::new();
let mut new_components = HashMap::new();

// Sources
Expand Down Expand Up @@ -291,15 +289,13 @@ pub fn update_config(config: &Config) {
component_key: component_key.clone(),
component_type: transform.inner.get_component_name().to_string(),
inputs: transform.inputs.clone(),
outputs: transform
.inner
.outputs(
&possible_definitions(&transform.inputs, config, &mut cache),
config.schema.log_namespace(),
)
.into_iter()
.map(|output| output.port.unwrap_or_else(|| DEFAULT_OUTPUT.to_string()))
.collect(),
outputs: get_transform_output_ids(
transform.inner.as_ref(),
"".into(),
config.schema.log_namespace(),
)
.map(|output| output.port.unwrap_or_else(|| DEFAULT_OUTPUT.to_string()))
.collect(),
})),
);
}
Expand Down
14 changes: 3 additions & 11 deletions src/config/compiler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use indexmap::IndexSet;

use super::{
builder::ConfigBuilder, graph::Graph, id::Inputs, schema, validation, Config, OutputId,
builder::ConfigBuilder, graph::Graph, id::Inputs, transform::get_transform_output_ids,
validation, Config, OutputId,
};

pub fn compile(mut builder: ConfigBuilder) -> Result<(Config, Vec<String>), Vec<String>> {
Expand Down Expand Up @@ -137,16 +138,7 @@ pub(crate) fn expand_globs(config: &mut ConfigBuilder) {
})
})
.chain(config.transforms.iter().flat_map(|(key, t)| {
t.inner
.outputs(
&[(key.into(), schema::Definition::any())],
config.schema.log_namespace(),
)
.into_iter()
.map(|output| OutputId {
component: key.clone(),
port: output.port,
})
get_transform_output_ids(t.inner.as_ref(), key.clone(), config.schema.log_namespace())
}))
.map(|output_id| output_id.to_string())
.collect::<IndexSet<String>>();
Expand Down
1 change: 1 addition & 0 deletions src/config/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl Graph {
Node::Transform {
in_ty: transform.inner.input().data_type(),
outputs: transform.inner.outputs(
enrichment::TableRegistry::default(),
&[(id.into(), schema::Definition::any())],
schema.log_namespace(),
),
Expand Down
4 changes: 3 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ pub use provider::ProviderConfig;
pub use secret::SecretBackend;
pub use sink::{SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
pub use transform::{BoxedTransform, TransformConfig, TransformContext, TransformOuter};
pub use transform::{
get_transform_output_ids, BoxedTransform, TransformConfig, TransformContext, TransformOuter,
};
pub use unit_test::{build_unit_tests, build_unit_tests_main, UnitTestResult};
pub use validation::warnings;
pub use vector_core::config::{
Expand Down
21 changes: 21 additions & 0 deletions src/config/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send +
/// of events flowing through the transform.
fn outputs(
&self,
enrichment_tables: enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
global_log_namespace: LogNamespace,
) -> Vec<TransformOutput>;
Expand Down Expand Up @@ -236,3 +237,23 @@ pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send +
}

dyn_clone::clone_trait_object!(TransformConfig);

/// Often we want to call outputs just to retrieve the OutputId's without needing
/// the schema definitions.
pub fn get_transform_output_ids<T: TransformConfig + ?Sized>(
transform: &T,
key: ComponentKey,
global_log_namespace: LogNamespace,
) -> impl Iterator<Item = OutputId> + '_ {
transform
.outputs(
enrichment::TableRegistry::default(),
&[(key.clone().into(), schema::Definition::any())],
global_log_namespace,
)
.into_iter()
.map(move |output| OutputId {
component: key.clone(),
port: output.port,
})
}
34 changes: 13 additions & 21 deletions src/config/unit_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use self::unit_test_components::{
UnitTestSinkCheck, UnitTestSinkConfig, UnitTestSinkResult, UnitTestSourceConfig,
UnitTestStreamSinkConfig, UnitTestStreamSourceConfig,
};
use super::{compiler::expand_globs, graph::Graph, OutputId};
use super::{compiler::expand_globs, graph::Graph, transform::get_transform_output_ids, OutputId};
use crate::{
conditions::Condition,
config::{
Expand Down Expand Up @@ -186,14 +186,11 @@ impl UnitTestBuildMetadata {
.transforms
.iter()
.flat_map(|(key, transform)| {
transform
.inner
.outputs(&[], builder.schema.log_namespace())
.into_iter()
.map(|output| OutputId {
component: key.clone(),
port: output.port,
})
get_transform_output_ids(
transform.inner.as_ref(),
key.clone(),
builder.schema.log_namespace(),
)
})
.collect::<HashSet<_>>();

Expand Down Expand Up @@ -457,18 +454,13 @@ async fn build_unit_test(
fn get_loose_end_outputs_sink(config: &ConfigBuilder) -> Option<SinkOuter<String>> {
let config = config.clone();
let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
transform
.inner
.outputs(&[], config.schema.log_namespace())
.iter()
.map(|output| {
if let Some(port) = &output.port {
OutputId::from((key, port.clone())).to_string()
} else {
key.to_string()
}
})
.collect::<Vec<_>>()
get_transform_output_ids(
transform.inner.as_ref(),
key.clone(),
config.schema.log_namespace(),
)
.map(|output| output.to_string())
.collect::<Vec<_>>()
});

let mut loose_end_outputs = Vec::new();
Expand Down
45 changes: 18 additions & 27 deletions src/config/validation.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use crate::{config::schema, topology::schema::possible_definitions};
use crate::config::schema;
use futures_util::{stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use heim::{disk::Partition, units::information::byte};
use indexmap::IndexMap;
use std::{collections::HashMap, path::PathBuf};
use vector_core::internal_event::DEFAULT_OUTPUT;

use super::{builder::ConfigBuilder, ComponentKey, Config, OutputId, Resource};
use super::{
builder::ConfigBuilder, transform::get_transform_output_ids, ComponentKey, Config, OutputId,
Resource,
};

/// Check that provide + topology config aren't present in the same builder, which is an error.
pub fn check_provider(config: &ConfigBuilder) -> Result<(), Vec<String>> {
Expand Down Expand Up @@ -169,15 +172,12 @@ pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
}

if transform
.inner
.outputs(
&[(OutputId::dummy(), definition)],
config.schema.log_namespace(),
)
.iter()
.map(|output| output.port.as_deref().unwrap_or(""))
.any(|name| name == DEFAULT_OUTPUT)
if get_transform_output_ids(
transform.inner.as_ref(),
key.clone(),
config.schema.log_namespace(),
)
.any(|output| matches!(output.port, Some(output) if output == DEFAULT_OUTPUT))
{
errors.push(format!(
"Transform {key} cannot have a named output with reserved name: `{DEFAULT_OUTPUT}`"
Expand Down Expand Up @@ -325,7 +325,6 @@ async fn process_partitions(partitions: Vec<Partition>) -> heim::Result<IndexMap

pub fn warnings(config: &Config) -> Vec<String> {
let mut warnings = vec![];
let mut cache = HashMap::new();

let source_ids = config.sources.iter().flat_map(|(key, source)| {
source
Expand All @@ -342,21 +341,13 @@ pub fn warnings(config: &Config) -> Vec<String> {
.collect::<Vec<_>>()
});
let transform_ids = config.transforms.iter().flat_map(|(key, transform)| {
transform
.inner
.outputs(
&possible_definitions(&transform.inputs, config, &mut cache),
config.schema.log_namespace(),
)
.iter()
.map(|output| {
if let Some(port) = &output.port {
("transform", OutputId::from((key, port.clone())))
} else {
("transform", OutputId::from(key))
}
})
.collect::<Vec<_>>()
get_transform_output_ids(
transform.inner.as_ref(),
key.clone(),
config.schema.log_namespace(),
)
.map(|output| ("transform", output))
.collect::<Vec<_>>()
});

for (input_type, id) in transform_ids.chain(source_ids) {
Expand Down
6 changes: 5 additions & 1 deletion src/test_util/mock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use self::{
BackpressureSourceConfig, BasicSourceConfig, ErrorSourceConfig, PanicSourceConfig,
TripwireSourceConfig,
},
transforms::BasicTransformConfig,
transforms::{BasicTransformConfig, ErrorDefinitionTransformConfig},
};

pub mod sinks;
Expand Down Expand Up @@ -66,6 +66,10 @@ pub fn basic_transform(suffix: &str, increase: f64) -> BasicTransformConfig {
BasicTransformConfig::new(suffix.to_owned(), increase)
}

pub const fn error_definition_transform() -> ErrorDefinitionTransformConfig {
ErrorDefinitionTransformConfig {}
}

pub const fn backpressure_sink(num_to_consume: usize) -> BackpressureSinkConfig {
BackpressureSinkConfig { num_to_consume }
}
Expand Down
1 change: 1 addition & 0 deletions src/test_util/mock/transforms/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl TransformConfig for BasicTransformConfig {

fn outputs(
&self,
_: enrichment::TableRegistry,
definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
Expand Down
62 changes: 62 additions & 0 deletions src/test_util/mock/transforms/error_definitions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use async_trait::async_trait;
use snafu::Snafu;
use value::Kind;
use vector_config::configurable_component;
use vector_core::{
config::{DataType, Input, LogNamespace, TransformOutput},
schema::Definition,
transform::Transform,
};

use crate::config::{OutputId, TransformConfig, TransformContext};

#[derive(Debug, Snafu)]
enum Error {
#[snafu(display("It all went horribly wrong"))]
ItAllWentHorriblyWrong,
}

/// Configuration for the `test_error_definition` transform.
#[configurable_component(transform("test_error_definition", "Test (error definition)"))]
#[derive(Clone, Debug, Default)]
pub struct ErrorDefinitionTransformConfig {}

impl_generate_config_from_default!(ErrorDefinitionTransformConfig);

#[async_trait]
#[typetag::serde(name = "test_error_definition")]
impl TransformConfig for ErrorDefinitionTransformConfig {
fn input(&self) -> Input {
Input::all()
}

fn outputs(
&self,
_: enrichment::TableRegistry,
definitions: &[(OutputId, Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
vec![TransformOutput::new(
DataType::all(),
definitions
.iter()
.map(|(output, definition)| {
(
output.clone(),
// Return a definition of Kind::never implying that we can never return a value.
Definition::new_with_default_metadata(
Kind::never(),
definition.log_namespaces().clone(),
),
)
})
.collect(),
)]
}

async fn build(&self, _: &TransformContext) -> crate::Result<Transform> {
// Even though the definitions returned were `Kind::never`, build needs to be
// called in order to return the Error.
Err(Error::ItAllWentHorriblyWrong.into())
}
}
Loading