diff --git a/docs/tutorials/sinks/1_basic_sink.md b/docs/tutorials/sinks/1_basic_sink.md index 4e999d227e822..ca91266925e80 100644 --- a/docs/tutorials/sinks/1_basic_sink.md +++ b/docs/tutorials/sinks/1_basic_sink.md @@ -22,16 +22,7 @@ Provide some module level comments to explain what the sink does. Let's setup all the imports we will need for the tutorial: ```rust -use super::Healthcheck; -use crate::config::{GenerateConfig, SinkConfig, SinkContext}; -use futures::{stream::BoxStream, StreamExt}; -use vector_common::finalization::{EventStatus, Finalizable}; -use vector_config::configurable_component; -use vector_core::{ - config::{AcknowledgementsConfig, Input}, - event::Event, - sink::{StreamSink, VectorSink}, -}; +use crate::prelude::*; ``` # Configuration diff --git a/docs/tutorials/sinks/2_http_sink.md b/docs/tutorials/sinks/2_http_sink.md index ed99ca4105d4b..7090ef41a88d1 100644 --- a/docs/tutorials/sinks/2_http_sink.md +++ b/docs/tutorials/sinks/2_http_sink.md @@ -12,32 +12,11 @@ To start, update our imports to the following: use std::task::Poll; use crate::{ - config::{GenerateConfig, SinkConfig, SinkContext}, + sinks::prelude::*, http::HttpClient, internal_events::SinkRequestBuildError, - sinks::util::{ - encoding::{write_all, Encoder}, - metadata::RequestMetadataBuilder, - request_builder::EncodeResult, - Compression, RequestBuilder, SinkBuilderExt, - }, - sinks::Healthcheck, }; use bytes::Bytes; -use futures::{future::BoxFuture, stream::BoxStream, StreamExt}; -use vector_common::{ - finalization::{EventFinalizers, EventStatus, Finalizable}, - internal_event::CountByteSize, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; -use vector_config::configurable_component; -use vector_core::{ - config::{AcknowledgementsConfig, Input}, - event::Event, - sink::{StreamSink, VectorSink}, - stream::DriverResponse, - tls::TlsSettings, -}; ``` # Configuration diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index f357cbb0f469b..7d30daba29d97 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -259,7 +259,7 @@ pub(crate) use self::unix::*; pub(crate) use self::websocket::*; #[cfg(windows)] pub(crate) use self::windows::*; -pub(crate) use self::{ +pub use self::{ adaptive_concurrency::*, batch::*, common::*, conditions::*, encoding_transcode::*, heartbeat::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*, }; diff --git a/src/sinks/amqp/config.rs b/src/sinks/amqp/config.rs index 3af34e943d7b2..a83cb7c1f2aad 100644 --- a/src/sinks/amqp/config.rs +++ b/src/sinks/amqp/config.rs @@ -1,17 +1,8 @@ //! Configuration functionality for the `AMQP` sink. -use crate::{ - amqp::AmqpConfig, - codecs::EncodingConfig, - config::{DataType, GenerateConfig, Input, SinkConfig, SinkContext}, - sinks::{Healthcheck, VectorSink}, - template::Template, -}; +use crate::{amqp::AmqpConfig, sinks::prelude::*}; use codecs::TextSerializerConfig; -use futures::FutureExt; use lapin::{types::ShortString, BasicProperties}; use std::sync::Arc; -use vector_config::configurable_component; -use vector_core::config::AcknowledgementsConfig; use super::sink::AmqpSink; diff --git a/src/sinks/amqp/encoder.rs b/src/sinks/amqp/encoder.rs index 6e86828c82923..d3d449811372f 100644 --- a/src/sinks/amqp/encoder.rs +++ b/src/sinks/amqp/encoder.rs @@ -1,8 +1,5 @@ //! Encoding for the `AMQP` sink. -use crate::{ - event::Event, - sinks::util::encoding::{write_all, Encoder}, -}; +use crate::sinks::prelude::*; use bytes::BytesMut; use std::io; use tokio_util::codec::Encoder as _; @@ -13,7 +10,7 @@ pub(super) struct AmqpEncoder { pub(super) transformer: crate::codecs::Transformer, } -impl Encoder for AmqpEncoder { +impl encoding::Encoder for AmqpEncoder { fn encode_input(&self, mut input: Event, writer: &mut dyn io::Write) -> io::Result { let mut body = BytesMut::new(); self.transformer.transform(&mut input); diff --git a/src/sinks/amqp/request_builder.rs b/src/sinks/amqp/request_builder.rs index ad8fe36565453..ace1af1f66fe4 100644 --- a/src/sinks/amqp/request_builder.rs +++ b/src/sinks/amqp/request_builder.rs @@ -1,22 +1,10 @@ //! Request builder for the `AMQP` sink. //! Responsible for taking the event (which includes rendered template values) and turning //! it into the raw bytes and other data needed to send the request to `AMQP`. -use crate::{ - event::Event, - sinks::util::{ - metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression, - RequestBuilder, - }, -}; +use crate::sinks::prelude::*; use bytes::Bytes; use lapin::BasicProperties; use std::io; -use vector_common::{ - finalization::{EventFinalizers, Finalizable}, - json_size::JsonSize, - request_metadata::RequestMetadata, -}; -use vector_core::EstimatedJsonEncodedSizeOf; use super::{encoder::AmqpEncoder, service::AmqpRequest, sink::AmqpEvent}; diff --git a/src/sinks/amqp/service.rs b/src/sinks/amqp/service.rs index ff1e71487298a..20b16b99e6e39 100644 --- a/src/sinks/amqp/service.rs +++ b/src/sinks/amqp/service.rs @@ -1,6 +1,9 @@ //! The main tower service that takes the request created by the request builder //! and sends it to `AMQP`. -use crate::internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError}; +use crate::{ + internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError}, + sinks::prelude::*, +}; use bytes::Bytes; use futures::future::BoxFuture; use lapin::{options::BasicPublishOptions, BasicProperties}; @@ -9,14 +12,6 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tower::Service; -use vector_common::{ - finalization::{EventFinalizers, EventStatus, Finalizable}, - internal_event::CountByteSize, - json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; -use vector_core::stream::DriverResponse; /// The request contains the data to send to `AMQP` together /// with the information need to route the message. diff --git a/src/sinks/amqp/sink.rs b/src/sinks/amqp/sink.rs index ff0bdeb9d0042..f1da0b8d944f0 100644 --- a/src/sinks/amqp/sink.rs +++ b/src/sinks/amqp/sink.rs @@ -1,19 +1,9 @@ //! The sink for the `AMQP` sink that wires together the main stream that takes the //! event and sends it to `AMQP`. -use crate::{ - codecs::Transformer, event::Event, internal_events::TemplateRenderingError, - sinks::util::builder::SinkBuilderExt, template::Template, -}; -use async_trait::async_trait; -use futures::StreamExt; -use futures_util::stream::BoxStream; +use crate::sinks::prelude::*; use lapin::{options::ConfirmSelectOptions, BasicProperties}; use serde::Serialize; use std::sync::Arc; -use tower::ServiceBuilder; -use vector_buffers::EventCount; -use vector_common::json_size::JsonSize; -use vector_core::{sink::StreamSink, ByteSizeOf, EstimatedJsonEncodedSizeOf}; use super::{ config::{AmqpPropertiesConfig, AmqpSinkConfig}, diff --git a/src/sinks/aws_kinesis/config.rs b/src/sinks/aws_kinesis/config.rs index e817d98881985..4e2d136a054ff 100644 --- a/src/sinks/aws_kinesis/config.rs +++ b/src/sinks/aws_kinesis/config.rs @@ -1,19 +1,13 @@ use std::marker::PhantomData; -use tower::ServiceBuilder; -use vector_config::configurable_component; -use vector_core::{ - config::{DataType, Input}, - sink::VectorSink, - stream::BatcherSettings, -}; +use vector_core::stream::BatcherSettings; use crate::{ aws::{AwsAuthentication, RegionOrEndpoint}, - codecs::{Encoder, EncodingConfig}, - config::AcknowledgementsConfig, - sinks::util::{retries::RetryLogic, Compression, ServiceBuilderExt, TowerRequestConfig}, - tls::TlsConfig, + sinks::{ + prelude::*, + util::{retries::RetryLogic, TowerRequestConfig}, + }, }; use super::{ @@ -78,7 +72,7 @@ impl KinesisSinkBaseConfig { } /// Builds an aws_kinesis sink. -pub async fn build_sink( +pub fn build_sink( config: &KinesisSinkBaseConfig, partition_key_field: Option, batch_settings: BatcherSettings, diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index 8255c6bf220bd..c8080e0711da3 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -141,8 +141,7 @@ impl SinkConfig for KinesisFirehoseSinkConfig { None, batch_settings, KinesisFirehoseClient { client }, - ) - .await?; + )?; Ok((sink, healthcheck)) } diff --git a/src/sinks/aws_kinesis/service.rs b/src/sinks/aws_kinesis/service.rs index 9ceeb8c8d4938..3539fee4e2eab 100644 --- a/src/sinks/aws_kinesis/service.rs +++ b/src/sinks/aws_kinesis/service.rs @@ -5,16 +5,13 @@ use std::{ use aws_smithy_client::SdkError; use aws_types::region::Region; -use futures::future::BoxFuture; -use tower::Service; -use vector_common::{json_size::JsonSize, request_metadata::MetaDescriptive}; -use vector_core::{internal_event::CountByteSize, stream::DriverResponse}; +use vector_core::internal_event::CountByteSize; use super::{ record::{Record, SendRecord}, sink::BatchKinesisRequest, }; -use crate::event::EventStatus; +use crate::{event::EventStatus, sinks::prelude::*}; pub struct KinesisService { pub client: C, diff --git a/src/sinks/aws_kinesis/sink.rs b/src/sinks/aws_kinesis/sink.rs index 0f74320ad4be1..bc3d53947c338 100644 --- a/src/sinks/aws_kinesis/sink.rs +++ b/src/sinks/aws_kinesis/sink.rs @@ -1,22 +1,13 @@ use std::{borrow::Cow, fmt::Debug, marker::PhantomData, num::NonZeroUsize}; -use async_trait::async_trait; -use futures::{future, stream::BoxStream, StreamExt}; use rand::random; -use tower::Service; -use vector_common::{ - finalization::{EventFinalizers, Finalizable}, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; -use vector_core::{ - partition::Partitioner, - stream::{BatcherSettings, DriverResponse}, -}; use crate::{ - event::{Event, LogEvent}, internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError}, - sinks::util::{processed_event::ProcessedEvent, SinkBuilderExt, StreamSink}, + sinks::{ + prelude::*, + util::{processed_event::ProcessedEvent, StreamSink}, + }, }; use super::{ diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index 9d3461a6d14c8..673ab7b4d212a 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -148,8 +148,7 @@ impl SinkConfig for KinesisStreamsSinkConfig { self.partition_key_field.clone(), batch_settings, KinesisStreamClient { client }, - ) - .await?; + )?; Ok((sink, healthcheck)) } diff --git a/src/sinks/kafka/config.rs b/src/sinks/kafka/config.rs index 4fe8a13ee3a20..9615c22ec7280 100644 --- a/src/sinks/kafka/config.rs +++ b/src/sinks/kafka/config.rs @@ -5,20 +5,15 @@ use futures::FutureExt; use rdkafka::ClientConfig; use serde_with::serde_as; use vector_config::configurable_component; -use vector_core::schema::Requirement; use vrl::value::Kind; use crate::{ - codecs::EncodingConfig, - config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, kafka::{KafkaAuthConfig, KafkaCompression}, serde::json::to_string, sinks::{ kafka::sink::{healthcheck, KafkaSink}, - util::{BatchConfig, NoDefaultsBatchSettings}, - Healthcheck, VectorSink, + prelude::*, }, - template::Template, }; pub(crate) const QUEUED_MIN_MESSAGES: u64 = 100000; diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 89a1fb5ce6827..f271a7a580e53 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -1,29 +1,17 @@ use std::task::{Context, Poll}; use bytes::Bytes; -use futures::future::BoxFuture; use rdkafka::{ error::KafkaError, message::OwnedHeaders, producer::{FutureProducer, FutureRecord}, util::Timeout, }; -use tower::Service; -use vector_common::{ - json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; -use vector_core::{ - internal_event::{ - ByteSize, BytesSent, CountByteSize, InternalEventHandle as _, Protocol, Registered, - }, - stream::DriverResponse, +use vector_core::internal_event::{ + ByteSize, BytesSent, CountByteSize, InternalEventHandle as _, Protocol, Registered, }; -use crate::{ - event::{EventFinalizers, EventStatus, Finalizable}, - kafka::KafkaStatisticsContext, -}; +use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*}; pub struct KafkaRequest { pub body: Bytes, diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index 80fe87d835744..e3060900f71c2 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -1,5 +1,4 @@ -use async_trait::async_trait; -use futures::{future, stream::BoxStream, StreamExt}; +use futures::future; use rdkafka::{ consumer::{BaseConsumer, Consumer}, error::KafkaError, @@ -12,17 +11,11 @@ use tower::limit::ConcurrencyLimit; use super::config::{KafkaRole, KafkaSinkConfig}; use crate::{ - codecs::{Encoder, Transformer}, - event::{Event, LogEvent}, kafka::KafkaStatisticsContext, - sinks::{ - kafka::{ - config::QUEUED_MIN_MESSAGES, request_builder::KafkaRequestBuilder, - service::KafkaService, - }, - util::{builder::SinkBuilderExt, StreamSink}, + sinks::kafka::{ + config::QUEUED_MIN_MESSAGES, request_builder::KafkaRequestBuilder, service::KafkaService, }, - template::{Template, TemplateParseError}, + sinks::prelude::*, }; #[derive(Debug, Snafu)] diff --git a/src/sinks/kafka/tests.rs b/src/sinks/kafka/tests.rs index 4efa9e4d313e3..ba1e62e1eeb9e 100644 --- a/src/sinks/kafka/tests.rs +++ b/src/sinks/kafka/tests.rs @@ -29,10 +29,8 @@ mod integration_test { sink::KafkaSink, *, }, - util::{BatchConfig, NoDefaultsBatchSettings}, - VectorSink, + prelude::*, }, - template::Template, test_util::{ components::{assert_sink_compliance, SINK_TAGS}, random_lines_with_stream, random_string, wait_for, diff --git a/src/sinks/loki/config.rs b/src/sinks/loki/config.rs index 108cfa0aa3f99..6cb74c426ec08 100644 --- a/src/sinks/loki/config.rs +++ b/src/sinks/loki/config.rs @@ -1,21 +1,12 @@ use std::collections::HashMap; -use futures::future::FutureExt; -use vector_config::configurable_component; use vrl::value::Kind; use super::{healthcheck::healthcheck, sink::LokiSink}; use crate::{ - codecs::EncodingConfig, - config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext}, http::{Auth, HttpClient, MaybeAuth}, schema, - sinks::{ - util::{BatchConfig, Compression, SinkBatchSettings, TowerRequestConfig, UriSerde}, - VectorSink, - }, - template::Template, - tls::{TlsConfig, TlsSettings}, + sinks::{prelude::*, util::UriSerde}, }; /// Loki-specific compression. diff --git a/src/sinks/loki/event.rs b/src/sinks/loki/event.rs index 1aede42e35954..6b85153c0655b 100644 --- a/src/sinks/loki/event.rs +++ b/src/sinks/loki/event.rs @@ -1,13 +1,9 @@ use std::{collections::HashMap, io}; +use crate::sinks::prelude::*; use bytes::Bytes; use serde::{ser::SerializeSeq, Serialize}; use vector_buffers::EventCount; -use vector_common::json_size::JsonSize; -use vector_core::{ - event::{EventFinalizers, Finalizable}, - ByteSizeOf, EstimatedJsonEncodedSizeOf, -}; use crate::sinks::util::encoding::{write_all, Encoder}; diff --git a/src/sinks/loki/service.rs b/src/sinks/loki/service.rs index ec62cb690e432..1ac3c871631cb 100644 --- a/src/sinks/loki/service.rs +++ b/src/sinks/loki/service.rs @@ -1,22 +1,15 @@ use std::task::{Context, Poll}; use bytes::Bytes; -use futures::future::BoxFuture; use http::StatusCode; use snafu::Snafu; -use tower::Service; use tracing::Instrument; -use vector_common::request_metadata::{MetaDescriptive, RequestMetadata}; -use vector_core::{ - event::{EventFinalizers, EventStatus, Finalizable}, - internal_event::CountByteSize, - stream::DriverResponse, -}; +use vector_core::internal_event::CountByteSize; use crate::sinks::loki::config::{CompressionConfigAdapter, ExtendedCompression}; use crate::{ http::{Auth, HttpClient}, - sinks::util::{retries::RetryLogic, UriSerde}, + sinks::{prelude::*, util::UriSerde}, }; #[derive(Clone)] diff --git a/src/sinks/loki/sink.rs b/src/sinks/loki/sink.rs index b5871d4e876e4..1ba3cbee6268a 100644 --- a/src/sinks/loki/sink.rs +++ b/src/sinks/loki/sink.rs @@ -1,44 +1,25 @@ use std::{collections::HashMap, num::NonZeroUsize}; use bytes::{Bytes, BytesMut}; -use futures::{stream::BoxStream, StreamExt}; use once_cell::sync::Lazy; use regex::Regex; use snafu::Snafu; use tokio_util::codec::Encoder as _; -use vector_common::request_metadata::RequestMetadata; -use vector_core::{ - event::{Event, EventFinalizers, Finalizable, Value}, - partition::Partitioner, - sink::StreamSink, - stream::BatcherSettings, - ByteSizeOf, EstimatedJsonEncodedSizeOf, -}; use super::{ config::{LokiConfig, OutOfOrderAction}, event::{LokiBatchEncoder, LokiEvent, LokiRecord, PartitionKey}, service::{LokiRequest, LokiRetryLogic, LokiService}, }; +use crate::sinks::loki::config::{CompressionConfigAdapter, ExtendedCompression}; use crate::sinks::loki::event::LokiBatchEncoding; -use crate::sinks::{ - loki::config::{CompressionConfigAdapter, ExtendedCompression}, - util::metadata::RequestMetadataBuilder, -}; use crate::{ - codecs::{Encoder, Transformer}, http::{get_http_scheme_from_uri, HttpClient}, internal_events::{ LokiEventUnlabeledError, LokiOutOfOrderEventDroppedError, LokiOutOfOrderEventRewritten, - SinkRequestBuildError, TemplateRenderingError, - }, - sinks::util::{ - builder::SinkBuilderExt, - request_builder::EncodeResult, - service::{ServiceBuilderExt, Svc}, - Compression, RequestBuilder, + SinkRequestBuildError, }, - template::Template, + sinks::prelude::*, }; #[derive(Clone)] diff --git a/src/sinks/loki/tests.rs b/src/sinks/loki/tests.rs index 5661b0c6ec8b8..cf34b729684c1 100644 --- a/src/sinks/loki/tests.rs +++ b/src/sinks/loki/tests.rs @@ -1,13 +1,11 @@ -use futures::StreamExt; +use vector_core::config::proxy::ProxyConfig; use super::{config::LokiConfig, healthcheck::healthcheck, sink::LokiSink}; use crate::{ - config::ProxyConfig, - event::{Event, LogEvent}, http::HttpClient, + sinks::prelude::*, sinks::util::test::{build_test_server, load_sink}, test_util, - tls::TlsSettings, }; #[test] diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index bce42f6769ba6..b21c5749841c6 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -3,6 +3,7 @@ use enum_dispatch::enum_dispatch; use futures::future::BoxFuture; use snafu::Snafu; +pub mod prelude; pub mod util; #[cfg(feature = "sinks-amqp")] diff --git a/src/sinks/prelude.rs b/src/sinks/prelude.rs new file mode 100644 index 0000000000000..15f5d99376a0f --- /dev/null +++ b/src/sinks/prelude.rs @@ -0,0 +1,45 @@ +//! Prelude module for sinks which will re-export the symbols that most +//! stream based sinks are likely to use. + +pub use crate::{ + codecs::{Encoder, EncodingConfig, Transformer}, + config::{DataType, GenerateConfig, SinkConfig, SinkContext}, + event::{Event, LogEvent}, + internal_events::TemplateRenderingError, + sinks::util::retries::RetryLogic, + sinks::{ + util::{ + builder::SinkBuilderExt, + encoding::{self, write_all}, + metadata::RequestMetadataBuilder, + request_builder::EncodeResult, + service::{ServiceBuilderExt, Svc}, + BatchConfig, Compression, NoDefaultsBatchSettings, RequestBuilder, SinkBatchSettings, + TowerRequestConfig, + }, + Healthcheck, + }, + template::{Template, TemplateParseError}, + tls::TlsConfig, +}; +pub use async_trait::async_trait; +pub use futures::{future, future::BoxFuture, stream::BoxStream, FutureExt, StreamExt}; +pub use tower::{Service, ServiceBuilder}; +pub use vector_buffers::EventCount; +pub use vector_common::{ + finalization::{EventFinalizers, EventStatus, Finalizable}, + internal_event::CountByteSize, + json_size::JsonSize, + request_metadata::{MetaDescriptive, RequestMetadata}, +}; +pub use vector_config::configurable_component; +pub use vector_core::{ + config::{AcknowledgementsConfig, Input}, + event::Value, + partition::Partitioner, + schema::Requirement, + sink::{StreamSink, VectorSink}, + stream::{BatcherSettings, DriverResponse}, + tls::TlsSettings, + ByteSizeOf, EstimatedJsonEncodedSizeOf, +}; diff --git a/src/sinks/pulsar/config.rs b/src/sinks/pulsar/config.rs index d7b9f505175b6..7ec5ef601f32e 100644 --- a/src/sinks/pulsar/config.rs +++ b/src/sinks/pulsar/config.rs @@ -1,12 +1,9 @@ use crate::{ - codecs::EncodingConfig, - config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext}, schema, sinks::{ + prelude::*, pulsar::sink::{healthcheck, PulsarSink}, - Healthcheck, VectorSink, }, - template::Template, }; use codecs::{encoding::SerializerConfig, TextSerializerConfig}; use futures_util::FutureExt; @@ -21,7 +18,6 @@ use pulsar::{ use pulsar::{error::AuthenticationError, OperationRetryOptions}; use snafu::ResultExt; use vector_common::sensitive_string::SensitiveString; -use vector_config::configurable_component; use vector_core::config::DataType; use vrl::value::Kind; diff --git a/src/sinks/pulsar/request_builder.rs b/src/sinks/pulsar/request_builder.rs index ec104d0ebf508..b284ffef1ab26 100644 --- a/src/sinks/pulsar/request_builder.rs +++ b/src/sinks/pulsar/request_builder.rs @@ -1,17 +1,10 @@ use bytes::Bytes; use std::collections::HashMap; use std::io; -use vector_common::finalization::EventFinalizers; -use vector_common::request_metadata::RequestMetadata; -use crate::sinks::pulsar::encoder::PulsarEncoder; -use crate::sinks::pulsar::sink::PulsarEvent; -use crate::sinks::util::metadata::RequestMetadataBuilder; -use crate::sinks::util::request_builder::EncodeResult; -use crate::sinks::util::{Compression, RequestBuilder}; -use crate::{ - event::{Event, Finalizable}, - sinks::pulsar::service::PulsarRequest, +use crate::sinks::{ + prelude::*, + pulsar::{encoder::PulsarEncoder, service::PulsarRequest, sink::PulsarEvent}, }; #[derive(Clone)] diff --git a/src/sinks/pulsar/service.rs b/src/sinks/pulsar/service.rs index bb61dcee92ed3..b04d2eb0d13e5 100644 --- a/src/sinks/pulsar/service.rs +++ b/src/sinks/pulsar/service.rs @@ -3,21 +3,13 @@ use std::sync::Arc; use std::task::{Context, Poll}; use bytes::Bytes; -use futures::future::BoxFuture; use pulsar::producer::Message; use pulsar::{Error as PulsarError, Executor, MultiTopicProducer, ProducerOptions, Pulsar}; use tokio::sync::Mutex; -use tower::Service; use vector_common::internal_event::CountByteSize; -use vector_core::stream::DriverResponse; -use crate::event::{EventFinalizers, EventStatus, Finalizable}; use crate::internal_events::PulsarSendingError; -use crate::sinks::pulsar::request_builder::PulsarMetadata; -use vector_common::{ - json_size::JsonSize, - request_metadata::{MetaDescriptive, RequestMetadata}, -}; +use crate::sinks::{prelude::*, pulsar::request_builder::PulsarMetadata}; #[derive(Clone)] pub(super) struct PulsarRequest { diff --git a/src/sinks/pulsar/sink.rs b/src/sinks/pulsar/sink.rs index c8ab0bcae256c..8644aa561dc58 100644 --- a/src/sinks/pulsar/sink.rs +++ b/src/sinks/pulsar/sink.rs @@ -1,24 +1,11 @@ use async_trait::async_trait; use bytes::Bytes; -use futures::{stream::BoxStream, StreamExt}; use pulsar::{Error as PulsarError, Pulsar, TokioExecutor}; use serde::Serialize; use snafu::Snafu; use std::collections::HashMap; -use tower::ServiceBuilder; -use crate::{ - codecs::{Encoder, Transformer}, - event::Event, - sinks::util::SinkBuilderExt, - template::Template, -}; -use vector_buffers::EventCount; -use vector_common::{byte_size_of::ByteSizeOf, json_size::JsonSize}; -use vector_core::{ - event::{EstimatedJsonEncodedSizeOf, LogEvent}, - sink::StreamSink, -}; +use crate::sinks::prelude::*; use super::{ config::PulsarSinkConfig, encoder::PulsarEncoder, request_builder::PulsarRequestBuilder, diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index c2705f5deff5a..00dc6944bdbad 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -78,7 +78,7 @@ impl Encoder for (Transformer, crate::codecs::Encoder<()>) { /// * `writer` - The object implementing io::Write to write data to. /// * `n_events_pending` - The number of events that are dropped if this write fails. /// * `buf` - The buffer to write. -pub(crate) fn write_all( +pub fn write_all( writer: &mut dyn io::Write, n_events_pending: usize, buf: &[u8],