Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 4 additions & 55 deletions docs/tutorials/sinks/1_basic_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ is deserialized to the fields in this struct so the user can customise the
sink's behaviour.

```rust
#[configurable_component(sink("basic"))]
#[configurable_component(sink("basic", "Basic sink."))]
#[derive(Clone, Debug)]
/// A basic sink that dumps its output to stdout.
pub struct BasicConfig {
Expand Down Expand Up @@ -75,10 +75,12 @@ configuration for the sink.
# SinkConfig

We need to implement the [`SinkConfig`][sink_config] trait. This is used by
Vector to generate the main Sink from the configuration.
Vector to generate the main Sink from the configuration. Note that type name
given to `typetag` below must match the name of the configurable component above.

```rust
#[async_trait::async_trait]
#[typetag::serde(name = "basic")]
impl SinkConfig for BasicConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let healthcheck = Box::pin(async move { Ok(()) });
Expand Down Expand Up @@ -198,59 +200,6 @@ sinks-logs = [
"sinks-chronicle",
```

## Module

Import this module into Vector. In `src/sinks/mod.rs` add the lines:


```diff
#[cfg(feature = "sinks-azure_monitor_logs")]
pub mod azure_monitor_logs;
+ #[cfg(feature = "sinks-basic")]
+ pub mod basic;
#[cfg(feature = "sinks-blackhole")]
pub mod blackhole;
```

All sinks are feature gated, this allows us to build custom versions of Vector
with only the components required. We will ignore the feature flag for now with
our new basic sink.

Next, each sink needs to be added to the [`Sinks`][sinks_enum] enum. Find the
enum in `mod.rs` and add our new sink to it.

```diff
#[configurable_component]
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Debug)]
#[serde(tag = "type", rename_all = "snake_case")]
#[enum_dispatch(SinkConfig)]
pub enum Sinks {
...

+ /// Basic
+ #[cfg(feature = "sinks-basic")]
+ Basic(#[configurable(derived)] basic::BasicConfig),

...

```

Then we need to add this to the `get_component_name` function defined below.

```diff

fn get_component_name(&self) -> &'static str {
match self {
...

+ #[cfg(feature = "sinks-basic")]
+ Self::Basic(config) => config.get_component_name(),

...

```

# Acknowledgements

When our sink finishes processing the event, it needs to acknowledge this so
Expand Down
1 change: 0 additions & 1 deletion src/api/schema/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::{
use async_graphql::{Enum, InputObject, Interface, Object, Subscription};
use once_cell::sync::Lazy;
use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt};
use vector_config::NamedComponent;
use vector_core::internal_event::DEFAULT_OUTPUT;

use crate::{
Expand Down
6 changes: 3 additions & 3 deletions src/components/validation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod test_case;
pub mod util;
mod validators;

use crate::{config::BoxedSource, config::BoxedTransform, sinks::Sinks};
use crate::config::{BoxedSink, BoxedSource, BoxedTransform};

pub use self::resources::*;
#[cfg(feature = "component-validation-runner")]
Expand Down Expand Up @@ -46,7 +46,7 @@ pub enum ComponentConfiguration {
Transform(BoxedTransform),

/// A sink component.
Sink(Sinks),
Sink(BoxedSink),
}

/// Configuration for validating a component.
Expand Down Expand Up @@ -88,7 +88,7 @@ impl ValidationConfiguration {
}

/// Creates a new `ValidationConfiguration` for a sink.
pub fn from_sink<C: Into<Sinks>>(
pub fn from_sink<C: Into<BoxedSink>>(
component_name: &'static str,
config: C,
external_resource: Option<ExternalResource>,
Expand Down
8 changes: 4 additions & 4 deletions src/components/validation/runner/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use crate::{
util::GrpcAddress,
ComponentConfiguration, ComponentType, ValidationConfiguration,
},
config::{BoxedSource, BoxedTransform, ConfigBuilder},
sinks::{vector::VectorConfig as VectorSinkConfig, Sinks},
config::{BoxedSink, BoxedSource, BoxedTransform, ConfigBuilder},
sinks::vector::VectorConfig as VectorSinkConfig,
sources::vector::VectorConfig as VectorSourceConfig,
test_util::next_addr,
};
Expand Down Expand Up @@ -78,7 +78,7 @@ impl TopologyBuilder {
}
}

fn from_sink(sink: Sinks) -> Self {
fn from_sink(sink: BoxedSink) -> Self {
let (input_edge, input_source) = build_input_edge();

let mut config_builder = ConfigBuilder::default();
Expand Down Expand Up @@ -130,7 +130,7 @@ fn build_input_edge() -> (InputEdge, impl Into<BoxedSource>) {
(input_edge, input_source)
}

fn build_output_edge() -> (OutputEdge, impl Into<Sinks>) {
fn build_output_edge() -> (OutputEdge, impl Into<BoxedSink>) {
let output_listen_addr = GrpcAddress::from(next_addr());
debug!(endpoint = %output_listen_addr, "Creating controlled output edge.");

Expand Down
17 changes: 10 additions & 7 deletions src/config/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,16 @@ use serde_json::Value;
use vector_config::configurable_component;
use vector_core::config::GlobalOptions;

use crate::{
enrichment_tables::EnrichmentTables, providers::Providers, secrets::SecretBackends,
sinks::Sinks,
};
use crate::{enrichment_tables::EnrichmentTables, providers::Providers, secrets::SecretBackends};

#[cfg(feature = "api")]
use super::api;
#[cfg(feature = "enterprise")]
use super::enterprise;
use super::{
compiler, schema, BoxedSource, BoxedTransform, ComponentKey, Config, EnrichmentTableOuter,
HealthcheckOptions, SinkOuter, SourceOuter, TestDefinition, TransformOuter,
compiler, schema, BoxedSink, BoxedSource, BoxedTransform, ComponentKey, Config,
EnrichmentTableOuter, HealthcheckOptions, SinkOuter, SourceOuter, TestDefinition,
TransformOuter,
};

/// A complete Vector configuration.
Expand Down Expand Up @@ -269,7 +267,12 @@ impl ConfigBuilder {
.insert(ComponentKey::from(key.into()), SourceOuter::new(source));
}

pub fn add_sink<K: Into<String>, S: Into<Sinks>>(&mut self, key: K, inputs: &[&str], sink: S) {
pub fn add_sink<K: Into<String>, S: Into<BoxedSink>>(
&mut self,
key: K,
inputs: &[&str],
sink: S,
) {
let inputs = inputs
.iter()
.map(|value| value.to_string())
Expand Down
4 changes: 2 additions & 2 deletions src/config/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use indexmap::{set::IndexSet, IndexMap};
use std::collections::{HashMap, HashSet, VecDeque};

use super::{
schema, ComponentKey, DataType, OutputId, SinkConfig, SinkOuter, SourceOuter, SourceOutput,
TransformOuter, TransformOutput,
schema, ComponentKey, DataType, OutputId, SinkOuter, SourceOuter, SourceOutput, TransformOuter,
TransformOutput,
};

#[derive(Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub use loading::{
};
pub use provider::ProviderConfig;
pub use secret::SecretBackend;
pub use sink::{SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
pub use sink::{BoxedSink, SinkConfig, SinkContext, SinkHealthcheckOptions, SinkOuter};
pub use source::{BoxedSource, SourceConfig, SourceContext, SourceOuter};
pub use transform::{
get_transform_output_ids, BoxedTransform, TransformConfig, TransformContext, TransformOuter,
Expand Down
48 changes: 41 additions & 7 deletions src/config/sink.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,47 @@
use std::cell::RefCell;

use async_trait::async_trait;
use enum_dispatch::enum_dispatch;
use dyn_clone::DynClone;
use serde::Serialize;
use vector_buffers::{BufferConfig, BufferType};
use vector_config::{configurable_component, Configurable, NamedComponent};
use vector_config::{
configurable_component, Configurable, GenerateError, Metadata, NamedComponent,
};
use vector_config_common::attributes::CustomAttribute;
use vector_config_common::schema::{SchemaGenerator, SchemaObject};
use vector_core::{
config::{AcknowledgementsConfig, GlobalOptions, Input},
sink::VectorSink,
};

use super::{id::Inputs, schema, ComponentKey, ProxyConfig, Resource};
use crate::sinks::{util::UriSerde, Healthcheck, Sinks};
use crate::sinks::{util::UriSerde, Healthcheck};

pub type BoxedSink = Box<dyn SinkConfig>;

impl Configurable for BoxedSink {
fn referenceable_name() -> Option<&'static str> {
Some("vector::sinks::Sinks")
}

fn metadata() -> Metadata {
let mut metadata = Metadata::default();
metadata.set_description("Configurable sinks in Vector.");
metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
metadata
}

fn generate_schema(gen: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError> {
vector_config::component::SinkDescription::generate_schemas(gen)
}
}

impl<T: SinkConfig + 'static> From<T> for BoxedSink {
fn from(value: T) -> Self {
Box::new(value)
}
}

/// Fully resolved sink component.
#[configurable_component]
Expand Down Expand Up @@ -49,7 +81,7 @@ where

#[serde(flatten)]
#[configurable(metadata(docs::hidden))]
pub inner: Sinks,
pub inner: BoxedSink,
}

impl<T> SinkOuter<T>
Expand All @@ -59,7 +91,7 @@ where
pub fn new<I, IS>(inputs: I, inner: IS) -> SinkOuter<T>
where
I: IntoIterator<Item = T>,
IS: Into<Sinks>,
IS: Into<BoxedSink>,
{
SinkOuter {
inputs: Inputs::from_iter(inputs),
Expand Down Expand Up @@ -170,8 +202,8 @@ impl From<UriSerde> for SinkHealthcheckOptions {

/// Generalized interface for describing and building sink components.
#[async_trait]
#[enum_dispatch]
pub trait SinkConfig: NamedComponent + core::fmt::Debug + Send + Sync {
#[typetag::serde(tag = "type")]
pub trait SinkConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
/// Builds the sink with the given context.
///
/// If the sink is built successfully, `Ok(...)` is returned containing the sink and the sink's
Expand Down Expand Up @@ -201,6 +233,8 @@ pub trait SinkConfig: NamedComponent + core::fmt::Debug + Send + Sync {
fn acknowledgements(&self) -> &AcknowledgementsConfig;
}

dyn_clone::clone_trait_object!(SinkConfig);

#[derive(Debug, Clone)]
pub struct SinkContext {
pub healthcheck: SinkHealthcheckOptions,
Expand Down
6 changes: 4 additions & 2 deletions src/config/unit_test/unit_test_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub struct UnitTestSinkResult {
}

/// Configuration for the `unit_test` sink.
#[configurable_component(sink("unit_test"))]
#[configurable_component(sink("unit_test", "Unit test."))]
#[derive(Clone, Default, Derivative)]
#[derivative(Debug)]
pub struct UnitTestSinkConfig {
Expand All @@ -158,6 +158,7 @@ pub struct UnitTestSinkConfig {
impl_generate_config_from_default!(UnitTestSinkConfig);

#[async_trait::async_trait]
#[typetag::serde(name = "unit_test")]
impl SinkConfig for UnitTestSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let tx = self.result_tx.lock().await.take();
Expand Down Expand Up @@ -272,7 +273,7 @@ impl StreamSink<Event> for UnitTestSink {
}

/// Configuration for the `unit_test_stream` sink.
#[configurable_component(sink("unit_test_stream"))]
#[configurable_component(sink("unit_test_stream", "Unit test stream."))]
#[derive(Clone, Default)]
pub struct UnitTestStreamSinkConfig {
/// Sink that receives the processed events.
Expand All @@ -297,6 +298,7 @@ impl std::fmt::Debug for UnitTestStreamSinkConfig {
}

#[async_trait::async_trait]
#[typetag::serde(name = "unit_test_stream")]
impl SinkConfig for UnitTestStreamSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let sink = self.sink.lock().await.take().unwrap();
Expand Down
6 changes: 5 additions & 1 deletion src/sinks/amqp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ impl AmqpPropertiesConfig {
/// Configuration for the `amqp` sink.
///
/// Supports AMQP version 0.9.1
#[configurable_component(sink("amqp"))]
#[configurable_component(sink(
"amqp",
"Send events to AMQP 0.9.1 compatible brokers like RabbitMQ."
))]
#[derive(Clone, Debug)]
pub struct AmqpSinkConfig {
/// The exchange to publish messages to.
Expand Down Expand Up @@ -89,6 +92,7 @@ impl GenerateConfig for AmqpSinkConfig {
}

#[async_trait::async_trait]
#[typetag::serde(name = "amqp")]
impl SinkConfig for AmqpSinkConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let sink = AmqpSink::new(self.clone()).await?;
Expand Down
3 changes: 2 additions & 1 deletion src/sinks/appsignal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ enum FinishError {
}

/// Configuration for the `appsignal` sink.
#[configurable_component(sink("appsignal"))]
#[configurable_component(sink("appsignal", "Send events to AppSignal."))]
#[derive(Clone, Debug, Default)]
pub struct AppsignalSinkConfig {
/// The URI for the AppSignal API to send data to.
Expand Down Expand Up @@ -106,6 +106,7 @@ impl SinkBatchSettings for AppsignalDefaultBatchSettings {
impl_generate_config_from_default!(AppsignalSinkConfig);

#[async_trait::async_trait]
#[typetag::serde(name = "appsignal")]
impl SinkConfig for AppsignalSinkConfig {
async fn build(
&self,
Expand Down
6 changes: 5 additions & 1 deletion src/sinks/aws_cloudwatch_logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ impl ClientBuilder for CloudwatchLogsClientBuilder {
}

/// Configuration for the `aws_cloudwatch_logs` sink.
#[configurable_component(sink("aws_cloudwatch_logs"))]
#[configurable_component(sink(
"aws_cloudwatch_logs",
"Publish log events to AWS CloudWatch Logs."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct CloudwatchLogsSinkConfig {
Expand Down Expand Up @@ -161,6 +164,7 @@ impl CloudwatchLogsSinkConfig {
}

#[async_trait::async_trait]
#[typetag::serde(name = "aws_cloudwatch_logs")]
impl SinkConfig for CloudwatchLogsSinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let batcher_settings = self.batch.into_batcher_settings()?;
Expand Down
Loading