Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions benches/remap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use chrono::{DateTime, Utc};
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use indexmap::IndexMap;
use vector::{
config::{DataType, Output},
config::{DataType, TransformOutput},
event::{Event, LogEvent, Value},
transforms::{
remap::{Remap, RemapConfig},
Expand All @@ -27,8 +27,10 @@ fn benchmark_remap(c: &mut Criterion) {
let mut group = c.benchmark_group("remap");

let add_fields_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
let mut outputs =
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();
Expand Down Expand Up @@ -77,8 +79,10 @@ fn benchmark_remap(c: &mut Criterion) {
});

let json_parser_runner = |tform: &mut Box<dyn SyncTransform>, event: Event| {
let mut outputs =
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();
Expand Down Expand Up @@ -129,8 +133,10 @@ fn benchmark_remap(c: &mut Criterion) {

let coerce_runner =
|tform: &mut Box<dyn SyncTransform>, event: Event, timestamp: DateTime<Utc>| {
let mut outputs =
TransformOutputsBuf::new_with_capacity(vec![Output::default(DataType::all())], 1);
let mut outputs = TransformOutputsBuf::new_with_capacity(
vec![TransformOutput::new(DataType::all(), HashMap::new())],
1,
);
tform.transform(event, &mut outputs);
let result = outputs.take_primary();
let output_1 = result.first().unwrap().as_log();
Expand Down
6 changes: 3 additions & 3 deletions benches/transform/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vector::transforms::{
TransformOutputsBuf,
};
use vector_core::{
config::{DataType, Output},
config::{DataType, TransformOutput},
event::{Event, EventContainer, EventMetadata, LogEvent},
transform::{SyncTransform, TransformContext},
};
Expand Down Expand Up @@ -54,10 +54,10 @@ fn route(c: &mut Criterion) {
"bba", "bbca", "dba", "bea", "fba", "gba", "hba", "iba", "jba", "bka", "bal", "bma", "bna",
"boa", "bpa", "bqa", "bra", "bsa", "bta", "bua", "bva", "bwa", "xba", "aby", "zba",
] {
outputs.push(Output {
outputs.push(TransformOutput {
port: Some(String::from(name)),
ty: DataType::Log,
log_schema_definition: None,
log_schema_definitions: Vec::new(),
});
}
let output_buffer: TransformOutputsBuf = TransformOutputsBuf::new_with_capacity(outputs, 10);
Expand Down
236 changes: 216 additions & 20 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::{fmt, num::NonZeroUsize};
use std::{collections::HashMap, fmt, num::NonZeroUsize};

use bitmask_enum::bitmask;
use bytes::Bytes;
use chrono::{DateTime, Utc};

mod global_options;
mod log_schema;
pub mod output_id;
pub mod proxy;

use crate::event::LogEvent;
pub use global_options::GlobalOptions;
pub use log_schema::{init_log_schema, log_schema, LogSchema};
use lookup::{lookup_v2::ValuePath, path, PathPrefix};
pub use output_id::OutputId;
use serde::{Deserialize, Serialize};
use value::Value;
pub use vector_common::config::ComponentKey;
Expand Down Expand Up @@ -100,42 +102,119 @@ impl Input {
}

#[derive(Debug, Clone, PartialEq)]
pub struct Output {
pub struct SourceOutput {
pub port: Option<String>,
pub ty: DataType,

// NOTE: schema definitions are only implemented/supported for log-type events. There is no
// inherent blocker to support other types as well, but it'll require additional work to add
// the relevant schemas, and store them separately in this type.
pub schema_definition: Option<schema::Definition>,
}

impl SourceOutput {
/// Create a `SourceOutput` of the given data type that contains a single output `Definition`.
/// Designed for use in log sources.
///
/// The `None` variant of a schema definition has two distinct meanings for a source component
/// versus a transform component:
///
/// For *sources*, a `None` schema is identical to a `Some(Definition::source_default())`.
/// # Panics
///
/// For a *transform*, a schema [`schema::Definition`] is required if `Datatype` is Log.
pub log_schema_definition: Option<schema::Definition>,
}
/// Panics if `ty` does not contain [`DataType::Log`].
#[must_use]
pub fn new_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
assert!(ty.contains(DataType::Log));

impl Output {
/// Create a default `Output` of the given data type.
///
/// A default output is one without a port identifier (i.e. not a named output) and the default
/// output consumers will receive if they declare the component itself as an input.
pub fn default(ty: DataType) -> Self {
Self {
port: None,
ty,
log_schema_definition: None,
schema_definition: Some(schema_definition),
}
}

/// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
/// Designed for use in metrics sources.
///
/// Sets the datatype to be [`DataType::Metric`].
#[must_use]
pub fn new_metrics() -> Self {
Self {
port: None,
ty: DataType::Metric,
schema_definition: None,
}
}

/// Create a `SourceOutput` of the given data type that contains no output `Definition`s.
/// Designed for use in trace sources.
///
/// Sets the datatype to be [`DataType::Trace`].
#[must_use]
pub fn new_traces() -> Self {
Self {
port: None,
ty: DataType::Trace,
schema_definition: None,
}
}

/// Set the schema definition for this `Output`.
/// Return the schema [`schema::Definition`] from this output.
///
/// Takes a `schema_enabled` flag to determine if the full definition including the fields
/// and associated types should be returned, or if a simple definition should be returned.
/// A simple definition is just the default for the namespace. For the Vector namespace the
/// meanings are included.
/// Schema enabled is set in the users configuration.
#[must_use]
pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
self.schema_definition.as_ref().map(|definition| {
if schema_enabled {
definition.clone()
} else {
let mut new_definition =
schema::Definition::default_for_namespace(definition.log_namespaces());

if definition.log_namespaces().contains(&LogNamespace::Vector) {
new_definition.add_meanings(definition.meanings());
}

new_definition
}
})
}
}

impl SourceOutput {
/// Set the port name for this `SourceOutput`.
#[must_use]
pub fn with_schema_definition(mut self, schema_definition: schema::Definition) -> Self {
self.log_schema_definition = Some(schema_definition);
pub fn with_port(mut self, name: impl Into<String>) -> Self {
self.port = Some(name.into());
self
}
}

#[derive(Debug, Clone, PartialEq)]
pub struct TransformOutput {
pub port: Option<String>,
pub ty: DataType,

/// For *transforms* if `Datatype` is [`DataType::Log`], if schema is
/// enabled, at least one definition should be output. If the transform
/// has multiple connected sources, it is possible to have multiple output
/// definitions - one for each input.
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
}

impl TransformOutput {
/// Create a `TransformOutput` of the given data type that contains multiple [`schema::Definition`]s.
/// Designed for use in transforms.
#[must_use]
pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
Self {
port: None,
ty,
log_schema_definitions: schema_definitions,
}
}

/// Set the port name for this `Output`.
#[must_use]
Expand All @@ -145,6 +224,18 @@ impl Output {
}
}

/// Simple utility function that can be used by transforms that make no changes to
/// the schema definitions of events.
/// Takes a list of definitions with [`OutputId`] returns them as a [`HashMap`].
pub fn clone_input_definitions(
input_definitions: &[(OutputId, schema::Definition)],
) -> HashMap<OutputId, schema::Definition> {
input_definitions
.iter()
.map(|(output, definition)| (output.clone(), definition.clone()))
.collect()
}

/// Source-specific end-to-end acknowledgements configuration.
///
/// This type exists solely to provide a source-specific description of the `acknowledgements`
Expand Down Expand Up @@ -427,10 +518,12 @@ impl LogNamespace {

#[cfg(test)]
mod test {
use crate::config::{init_log_schema, LogNamespace, LogSchema};
use super::*;
use crate::event::LogEvent;
use chrono::Utc;
use lookup::event_path;
use lookup::{event_path, owned_value_path, OwnedTargetPath};
use value::Kind;
use vector_common::btreemap;

#[test]
fn test_insert_standard_vector_source_metadata() {
Expand All @@ -446,4 +539,107 @@ mod test {

assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
}

#[test]
fn test_source_definitions_legacy() {
let definition = schema::Definition::empty_legacy_namespace()
.with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
let output = SourceOutput::new_logs(DataType::Log, definition);

let valid_event = LogEvent::from(Value::from(btreemap! {
"zork" => "norknoog",
"nork" => 32
}))
.into();

let invalid_event = LogEvent::from(Value::from(btreemap! {
"nork" => 32
}))
.into();

// Get a definition with schema enabled.
let new_definition = output.schema_definition(true).unwrap();

// Meanings should still exist.
assert_eq!(
Some(&OwnedTargetPath::event(owned_value_path!("zork"))),
new_definition.meaning_path("zork")
);

// Events should have the schema validated.
new_definition.assert_valid_for_event(&valid_event);
new_definition.assert_invalid_for_event(&invalid_event);

// There should be the default legacy definition without schemas enabled.
assert_eq!(
Some(schema::Definition::default_legacy_namespace()),
output.schema_definition(false)
);
}

#[test]
fn test_source_definitons_vector() {
let definition = schema::Definition::default_for_namespace(&[LogNamespace::Vector].into())
.with_metadata_field(
&owned_value_path!("vector", "zork"),
Kind::integer(),
Some("zork"),
)
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);

let output = SourceOutput::new_logs(DataType::Log, definition);

let mut valid_event = LogEvent::from(Value::from(btreemap! {
"nork" => 32
}));

valid_event
.metadata_mut()
.value_mut()
.insert(path!("vector").concat("zork"), 32);

let valid_event = valid_event.into();

let mut invalid_event = LogEvent::from(Value::from(btreemap! {
"nork" => 32
}));

invalid_event
.metadata_mut()
.value_mut()
.insert(path!("vector").concat("zork"), "noog");

let invalid_event = invalid_event.into();

// Get a definition with schema enabled.
let new_definition = output.schema_definition(true).unwrap();

// Meanings should still exist.
assert_eq!(
Some(&OwnedTargetPath::metadata(owned_value_path!(
"vector", "zork"
))),
new_definition.meaning_path("zork")
);

// Events should have the schema validated.
new_definition.assert_valid_for_event(&valid_event);
new_definition.assert_invalid_for_event(&invalid_event);

// Get a definition without schema enabled.
let new_definition = output.schema_definition(false).unwrap();

// Meanings should still exist.
assert_eq!(
Some(&OwnedTargetPath::metadata(owned_value_path!(
"vector", "zork"
))),
new_definition.meaning_path("zork")
);

// Events should not have the schema validated.
new_definition.assert_valid_for_event(&valid_event);
new_definition.assert_valid_for_event(&invalid_event);
}
}
Loading