Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions docs/tutorials/sinks/1_basic_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,7 @@ Provide some module level comments to explain what the sink does.
Let's setup all the imports we will need for the tutorial:

```rust
use super::Healthcheck;
use crate::config::{GenerateConfig, SinkConfig, SinkContext};
use futures::{stream::BoxStream, StreamExt};
use vector_common::finalization::{EventStatus, Finalizable};
use vector_config::configurable_component;
use vector_core::{
config::{AcknowledgementsConfig, Input},
event::Event,
sink::{StreamSink, VectorSink},
};
use crate::prelude::*;
```

# Configuration
Expand Down
23 changes: 1 addition & 22 deletions docs/tutorials/sinks/2_http_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,11 @@ To start, update our imports to the following:
use std::task::Poll;

use crate::{
config::{GenerateConfig, SinkConfig, SinkContext},
sinks::prelude::*,
http::HttpClient,
internal_events::SinkRequestBuildError,
sinks::util::{
encoding::{write_all, Encoder},
metadata::RequestMetadataBuilder,
request_builder::EncodeResult,
Compression, RequestBuilder, SinkBuilderExt,
},
sinks::Healthcheck,
};
use bytes::Bytes;
use futures::{future::BoxFuture, stream::BoxStream, StreamExt};
use vector_common::{
finalization::{EventFinalizers, EventStatus, Finalizable},
internal_event::CountByteSize,
request_metadata::{MetaDescriptive, RequestMetadata},
};
use vector_config::configurable_component;
use vector_core::{
config::{AcknowledgementsConfig, Input},
event::Event,
sink::{StreamSink, VectorSink},
stream::DriverResponse,
tls::TlsSettings,
};
```

# Configuration
Expand Down
2 changes: 1 addition & 1 deletion src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub(crate) use self::unix::*;
pub(crate) use self::websocket::*;
#[cfg(windows)]
pub(crate) use self::windows::*;
pub(crate) use self::{
pub use self::{
adaptive_concurrency::*, batch::*, common::*, conditions::*, encoding_transcode::*,
heartbeat::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*,
};
Expand Down
11 changes: 1 addition & 10 deletions src/sinks/amqp/config.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
//! Configuration functionality for the `AMQP` sink.
use crate::{
amqp::AmqpConfig,
codecs::EncodingConfig,
config::{DataType, GenerateConfig, Input, SinkConfig, SinkContext},
sinks::{Healthcheck, VectorSink},
template::Template,
};
use crate::{amqp::AmqpConfig, sinks::prelude::*};
use codecs::TextSerializerConfig;
use futures::FutureExt;
use lapin::{types::ShortString, BasicProperties};
use std::sync::Arc;
use vector_config::configurable_component;
use vector_core::config::AcknowledgementsConfig;

use super::sink::AmqpSink;

Expand Down
7 changes: 2 additions & 5 deletions src/sinks/amqp/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
//! Encoding for the `AMQP` sink.
use crate::{
event::Event,
sinks::util::encoding::{write_all, Encoder},
};
use crate::sinks::prelude::*;
use bytes::BytesMut;
use std::io;
use tokio_util::codec::Encoder as _;
Expand All @@ -13,7 +10,7 @@ pub(super) struct AmqpEncoder {
pub(super) transformer: crate::codecs::Transformer,
}

impl Encoder<Event> for AmqpEncoder {
impl encoding::Encoder<Event> for AmqpEncoder {
fn encode_input(&self, mut input: Event, writer: &mut dyn io::Write) -> io::Result<usize> {
let mut body = BytesMut::new();
self.transformer.transform(&mut input);
Expand Down
14 changes: 1 addition & 13 deletions src/sinks/amqp/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,10 @@
//! Request builder for the `AMQP` sink.
//! Responsible for taking the event (which includes rendered template values) and turning
//! it into the raw bytes and other data needed to send the request to `AMQP`.
use crate::{
event::Event,
sinks::util::{
metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
RequestBuilder,
},
};
use crate::sinks::prelude::*;
use bytes::Bytes;
use lapin::BasicProperties;
use std::io;
use vector_common::{
finalization::{EventFinalizers, Finalizable},
json_size::JsonSize,
request_metadata::RequestMetadata,
};
use vector_core::EstimatedJsonEncodedSizeOf;

use super::{encoder::AmqpEncoder, service::AmqpRequest, sink::AmqpEvent};

Expand Down
13 changes: 4 additions & 9 deletions src/sinks/amqp/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
//! The main tower service that takes the request created by the request builder
//! and sends it to `AMQP`.
use crate::internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError};
use crate::{
internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError},
sinks::prelude::*,
};
use bytes::Bytes;
use futures::future::BoxFuture;
use lapin::{options::BasicPublishOptions, BasicProperties};
Expand All @@ -9,14 +12,6 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tower::Service;
use vector_common::{
finalization::{EventFinalizers, EventStatus, Finalizable},
internal_event::CountByteSize,
json_size::JsonSize,
request_metadata::{MetaDescriptive, RequestMetadata},
};
use vector_core::stream::DriverResponse;

/// The request contains the data to send to `AMQP` together
/// with the information need to route the message.
Expand Down
12 changes: 1 addition & 11 deletions src/sinks/amqp/sink.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
//! The sink for the `AMQP` sink that wires together the main stream that takes the
//! event and sends it to `AMQP`.
use crate::{
codecs::Transformer, event::Event, internal_events::TemplateRenderingError,
sinks::util::builder::SinkBuilderExt, template::Template,
};
use async_trait::async_trait;
use futures::StreamExt;
use futures_util::stream::BoxStream;
use crate::sinks::prelude::*;
use lapin::{options::ConfirmSelectOptions, BasicProperties};
use serde::Serialize;
use std::sync::Arc;
use tower::ServiceBuilder;
use vector_buffers::EventCount;
use vector_common::json_size::JsonSize;
use vector_core::{sink::StreamSink, ByteSizeOf, EstimatedJsonEncodedSizeOf};

use super::{
config::{AmqpPropertiesConfig, AmqpSinkConfig},
Expand Down
18 changes: 6 additions & 12 deletions src/sinks/aws_kinesis/config.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
use std::marker::PhantomData;

use tower::ServiceBuilder;
use vector_config::configurable_component;
use vector_core::{
config::{DataType, Input},
sink::VectorSink,
stream::BatcherSettings,
};
use vector_core::stream::BatcherSettings;

use crate::{
aws::{AwsAuthentication, RegionOrEndpoint},
codecs::{Encoder, EncodingConfig},
config::AcknowledgementsConfig,
sinks::util::{retries::RetryLogic, Compression, ServiceBuilderExt, TowerRequestConfig},
tls::TlsConfig,
sinks::{
prelude::*,
util::{retries::RetryLogic, TowerRequestConfig},
},
};

use super::{
Expand Down Expand Up @@ -78,7 +72,7 @@ impl KinesisSinkBaseConfig {
}

/// Builds an aws_kinesis sink.
pub async fn build_sink<C, R, RR, E, RT>(
pub fn build_sink<C, R, RR, E, RT>(
config: &KinesisSinkBaseConfig,
partition_key_field: Option<String>,
batch_settings: BatcherSettings,
Expand Down
3 changes: 1 addition & 2 deletions src/sinks/aws_kinesis/firehose/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ impl SinkConfig for KinesisFirehoseSinkConfig {
None,
batch_settings,
KinesisFirehoseClient { client },
)
.await?;
)?;

Ok((sink, healthcheck))
}
Expand Down
7 changes: 2 additions & 5 deletions src/sinks/aws_kinesis/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@ use std::{

use aws_smithy_client::SdkError;
use aws_types::region::Region;
use futures::future::BoxFuture;
use tower::Service;
use vector_common::{json_size::JsonSize, request_metadata::MetaDescriptive};
use vector_core::{internal_event::CountByteSize, stream::DriverResponse};
use vector_core::internal_event::CountByteSize;

use super::{
record::{Record, SendRecord},
sink::BatchKinesisRequest,
};
use crate::event::EventStatus;
use crate::{event::EventStatus, sinks::prelude::*};

pub struct KinesisService<C, T, E> {
pub client: C,
Expand Down
17 changes: 4 additions & 13 deletions src/sinks/aws_kinesis/sink.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
use std::{borrow::Cow, fmt::Debug, marker::PhantomData, num::NonZeroUsize};

use async_trait::async_trait;
use futures::{future, stream::BoxStream, StreamExt};
use rand::random;
use tower::Service;
use vector_common::{
finalization::{EventFinalizers, Finalizable},
request_metadata::{MetaDescriptive, RequestMetadata},
};
use vector_core::{
partition::Partitioner,
stream::{BatcherSettings, DriverResponse},
};

use crate::{
event::{Event, LogEvent},
internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError},
sinks::util::{processed_event::ProcessedEvent, SinkBuilderExt, StreamSink},
sinks::{
prelude::*,
util::{processed_event::ProcessedEvent, StreamSink},
},
};

use super::{
Expand Down
3 changes: 1 addition & 2 deletions src/sinks/aws_kinesis/streams/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ impl SinkConfig for KinesisStreamsSinkConfig {
self.partition_key_field.clone(),
batch_settings,
KinesisStreamClient { client },
)
.await?;
)?;

Ok((sink, healthcheck))
}
Expand Down
7 changes: 1 addition & 6 deletions src/sinks/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,15 @@ use futures::FutureExt;
use rdkafka::ClientConfig;
use serde_with::serde_as;
use vector_config::configurable_component;
use vector_core::schema::Requirement;
use vrl::value::Kind;

use crate::{
codecs::EncodingConfig,
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
kafka::{KafkaAuthConfig, KafkaCompression},
serde::json::to_string,
sinks::{
kafka::sink::{healthcheck, KafkaSink},
util::{BatchConfig, NoDefaultsBatchSettings},
Healthcheck, VectorSink,
prelude::*,
},
template::Template,
};

pub(crate) const QUEUED_MIN_MESSAGES: u64 = 100000;
Expand Down
18 changes: 3 additions & 15 deletions src/sinks/kafka/service.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
use std::task::{Context, Poll};

use bytes::Bytes;
use futures::future::BoxFuture;
use rdkafka::{
error::KafkaError,
message::OwnedHeaders,
producer::{FutureProducer, FutureRecord},
util::Timeout,
};
use tower::Service;
use vector_common::{
json_size::JsonSize,
request_metadata::{MetaDescriptive, RequestMetadata},
};
use vector_core::{
internal_event::{
ByteSize, BytesSent, CountByteSize, InternalEventHandle as _, Protocol, Registered,
},
stream::DriverResponse,
use vector_core::internal_event::{
ByteSize, BytesSent, CountByteSize, InternalEventHandle as _, Protocol, Registered,
};

use crate::{
event::{EventFinalizers, EventStatus, Finalizable},
kafka::KafkaStatisticsContext,
};
use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*};

pub struct KafkaRequest {
pub body: Bytes,
Expand Down
15 changes: 4 additions & 11 deletions src/sinks/kafka/sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use async_trait::async_trait;
use futures::{future, stream::BoxStream, StreamExt};
use futures::future;
use rdkafka::{
consumer::{BaseConsumer, Consumer},
error::KafkaError,
Expand All @@ -12,17 +11,11 @@ use tower::limit::ConcurrencyLimit;

use super::config::{KafkaRole, KafkaSinkConfig};
use crate::{
codecs::{Encoder, Transformer},
event::{Event, LogEvent},
kafka::KafkaStatisticsContext,
sinks::{
kafka::{
config::QUEUED_MIN_MESSAGES, request_builder::KafkaRequestBuilder,
service::KafkaService,
},
util::{builder::SinkBuilderExt, StreamSink},
sinks::kafka::{
config::QUEUED_MIN_MESSAGES, request_builder::KafkaRequestBuilder, service::KafkaService,
},
template::{Template, TemplateParseError},
sinks::prelude::*,
};

#[derive(Debug, Snafu)]
Expand Down
4 changes: 1 addition & 3 deletions src/sinks/kafka/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ mod integration_test {
sink::KafkaSink,
*,
},
util::{BatchConfig, NoDefaultsBatchSettings},
VectorSink,
prelude::*,
},
template::Template,
test_util::{
components::{assert_sink_compliance, SINK_TAGS},
random_lines_with_stream, random_string, wait_for,
Expand Down
11 changes: 1 addition & 10 deletions src/sinks/loki/config.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
use std::collections::HashMap;

use futures::future::FutureExt;
use vector_config::configurable_component;
use vrl::value::Kind;

use super::{healthcheck::healthcheck, sink::LokiSink};
use crate::{
codecs::EncodingConfig,
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
http::{Auth, HttpClient, MaybeAuth},
schema,
sinks::{
util::{BatchConfig, Compression, SinkBatchSettings, TowerRequestConfig, UriSerde},
VectorSink,
},
template::Template,
tls::{TlsConfig, TlsSettings},
sinks::{prelude::*, util::UriSerde},
};

/// Loki-specific compression.
Expand Down
Loading