diff --git a/src/sinks/aws_kinesis/config.rs b/src/sinks/aws_kinesis/config.rs index e817d98881985..34b9f0347e27e 100644 --- a/src/sinks/aws_kinesis/config.rs +++ b/src/sinks/aws_kinesis/config.rs @@ -58,6 +58,13 @@ pub struct KinesisSinkBaseConfig { #[serde(default)] pub auth: AwsAuthentication, + /// Whether or not to retry successful requests containing partial failures. + /// + /// Note: this will cause duplicates in firehose. + #[serde(default)] + #[configurable(metadata(docs::advanced))] + pub request_retry_partial: bool, + #[configurable(derived)] #[serde( default, @@ -83,6 +90,7 @@ pub async fn build_sink( partition_key_field: Option, batch_settings: BatcherSettings, client: C, + retry_logic: RT, ) -> crate::Result where C: SendRecord + Clone + Send + Sync + 'static, @@ -98,7 +106,7 @@ where let region = config.region.region(); let service = ServiceBuilder::new() - .settings::>(request_limits, RT::default()) + .settings::>(request_limits, retry_logic) .service(KinesisService:: { client, stream_name: config.stream_name.clone(), diff --git a/src/sinks/aws_kinesis/firehose/config.rs b/src/sinks/aws_kinesis/firehose/config.rs index 8255c6bf220bd..ef46e23277824 100644 --- a/src/sinks/aws_kinesis/firehose/config.rs +++ b/src/sinks/aws_kinesis/firehose/config.rs @@ -8,6 +8,7 @@ use futures::FutureExt; use snafu::Snafu; use vector_config::configurable_component; +use crate::sinks::util::retries::RetryAction; use crate::{ aws::{create_client, is_retriable_error, ClientBuilder}, config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext}, @@ -141,6 +142,9 @@ impl SinkConfig for KinesisFirehoseSinkConfig { None, batch_settings, KinesisFirehoseClient { client }, + KinesisRetryLogic { + retry_partial: self.base.request_retry_partial, + }, ) .await?; @@ -167,7 +171,9 @@ impl GenerateConfig for KinesisFirehoseSinkConfig { } #[derive(Clone, Default)] -struct KinesisRetryLogic; +struct KinesisRetryLogic { + retry_partial: bool, +} impl RetryLogic for KinesisRetryLogic { type Error = SdkError; @@ -181,4 +187,13 @@ impl RetryLogic for KinesisRetryLogic { } is_retriable_error(error) } + + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + if self.retry_partial && response.failure_count > 0 { + let msg = format!("partial error count {}", response.failure_count); + return RetryAction::Retry(msg.into()); + } else { + RetryAction::Successful + } + } } diff --git a/src/sinks/aws_kinesis/firehose/record.rs b/src/sinks/aws_kinesis/firehose/record.rs index 49d1ee821f5c3..b83c84ece9c02 100644 --- a/src/sinks/aws_kinesis/firehose/record.rs +++ b/src/sinks/aws_kinesis/firehose/record.rs @@ -1,3 +1,5 @@ +use crate::sinks::aws_kinesis::KinesisResponse; +use aws_sdk_firehose::output::PutRecordBatchOutput; use aws_sdk_firehose::types::{Blob, SdkError}; use bytes::Bytes; use tracing::Instrument; @@ -46,7 +48,13 @@ impl SendRecord for KinesisFirehoseClient { type T = KinesisRecord; type E = KinesisError; - async fn send(&self, records: Vec, stream_name: String) -> Option> { + async fn send( + &self, + records: Vec, + stream_name: String, + ) -> Result> { + let rec_count = records.len().clone(); + self.client .put_record_batch() .set_records(Some(records)) @@ -54,6 +62,10 @@ impl SendRecord for KinesisFirehoseClient { .send() .instrument(info_span!("request").or_current()) .await - .err() + .map(|output: PutRecordBatchOutput| KinesisResponse { + count: rec_count, + failure_count: output.failed_put_count().unwrap_or(0) as usize, + events_byte_size: 0, + }) } } diff --git a/src/sinks/aws_kinesis/firehose/tests.rs b/src/sinks/aws_kinesis/firehose/tests.rs index a15fb47a4e794..54c55d9efee1d 100644 --- a/src/sinks/aws_kinesis/firehose/tests.rs +++ b/src/sinks/aws_kinesis/firehose/tests.rs @@ -33,6 +33,7 @@ async fn check_batch_size() { request: Default::default(), tls: None, auth: Default::default(), + request_retry_partial: false, acknowledgements: Default::default(), }; @@ -62,6 +63,7 @@ async fn check_batch_events() { request: Default::default(), tls: None, auth: Default::default(), + request_retry_partial: false, acknowledgements: Default::default(), }; diff --git a/src/sinks/aws_kinesis/record.rs b/src/sinks/aws_kinesis/record.rs index 03ad11c710416..7b4edb8df7790 100644 --- a/src/sinks/aws_kinesis/record.rs +++ b/src/sinks/aws_kinesis/record.rs @@ -1,3 +1,4 @@ +use crate::sinks::aws_kinesis::KinesisResponse; use async_trait::async_trait; use aws_smithy_client::SdkError; use bytes::Bytes; @@ -24,5 +25,9 @@ pub trait SendRecord { type E; /// Sends the records. - async fn send(&self, records: Vec, stream_name: String) -> Option>; + async fn send( + &self, + records: Vec, + stream_name: String, + ) -> Result>; } diff --git a/src/sinks/aws_kinesis/service.rs b/src/sinks/aws_kinesis/service.rs index a1058806288c4..53bf60dc90e4a 100644 --- a/src/sinks/aws_kinesis/service.rs +++ b/src/sinks/aws_kinesis/service.rs @@ -40,8 +40,9 @@ where } pub struct KinesisResponse { - count: usize, - events_byte_size: usize, + pub(crate) count: usize, + pub(crate) failure_count: usize, + pub(crate) events_byte_size: usize, } impl DriverResponse for KinesisResponse { @@ -73,7 +74,6 @@ where // Emission of internal events for errors and dropped events is handled upstream by the caller. fn call(&mut self, requests: BatchKinesisRequest) -> Self::Future { let events_byte_size = requests.get_metadata().events_byte_size(); - let count = requests.get_metadata().event_count(); let records = requests .events @@ -85,16 +85,10 @@ where let stream_name = self.stream_name.clone(); Box::pin(async move { - // Returning a Result (a trait that implements Try) is not a stable feature, - // so instead we have to explicitly check for error and return. - // https://github.com/rust-lang/rust/issues/84277 - if let Some(e) = client.send(records, stream_name).await { - return Err(e); - } - - Ok(KinesisResponse { - count, - events_byte_size, + client.send(records, stream_name).await.map(|mut r| { + // augment the response + r.events_byte_size = events_byte_size; + r }) }) } diff --git a/src/sinks/aws_kinesis/streams/config.rs b/src/sinks/aws_kinesis/streams/config.rs index eb4f7c9e3b66f..5dc3171990a85 100644 --- a/src/sinks/aws_kinesis/streams/config.rs +++ b/src/sinks/aws_kinesis/streams/config.rs @@ -6,6 +6,7 @@ use futures::FutureExt; use snafu::Snafu; use vector_config::{component::GenerateConfig, configurable_component}; +use crate::sinks::util::retries::RetryAction; use crate::{ aws::{create_client, is_retriable_error, ClientBuilder}, config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext}, @@ -148,6 +149,9 @@ impl SinkConfig for KinesisStreamsSinkConfig { self.partition_key_field.clone(), batch_settings, KinesisStreamClient { client }, + KinesisRetryLogic { + retry_partial: self.base.request_retry_partial, + }, ) .await?; @@ -174,7 +178,9 @@ impl GenerateConfig for KinesisStreamsSinkConfig { } } #[derive(Default, Clone)] -struct KinesisRetryLogic; +struct KinesisRetryLogic { + retry_partial: bool, +} impl RetryLogic for KinesisRetryLogic { type Error = SdkError; @@ -188,6 +194,15 @@ impl RetryLogic for KinesisRetryLogic { } is_retriable_error(error) } + + fn should_retry_response(&self, response: &Self::Response) -> RetryAction { + if self.retry_partial && response.failure_count > 0 { + let msg = format!("partial error count {}", response.failure_count); + return RetryAction::Retry(msg.into()); + } else { + RetryAction::Successful + } + } } #[cfg(test)] diff --git a/src/sinks/aws_kinesis/streams/record.rs b/src/sinks/aws_kinesis/streams/record.rs index 67eba50d9aff2..5c47dfd024d1d 100644 --- a/src/sinks/aws_kinesis/streams/record.rs +++ b/src/sinks/aws_kinesis/streams/record.rs @@ -1,3 +1,5 @@ +use crate::sinks::aws_kinesis::KinesisResponse; +use aws_sdk_kinesis::output::PutRecordsOutput; use aws_sdk_kinesis::types::{Blob, SdkError}; use bytes::Bytes; use tracing::Instrument; @@ -62,7 +64,12 @@ impl SendRecord for KinesisStreamClient { type T = KinesisRecord; type E = KinesisError; - async fn send(&self, records: Vec, stream_name: String) -> Option> { + async fn send( + &self, + records: Vec, + stream_name: String, + ) -> Result> { + let rec_count = records.len().clone(); self.client .put_records() .set_records(Some(records)) @@ -70,6 +77,10 @@ impl SendRecord for KinesisStreamClient { .send() .instrument(info_span!("request").or_current()) .await - .err() + .map(|output: PutRecordsOutput| KinesisResponse { + count: rec_count, + failure_count: output.failed_record_count().unwrap_or(0) as usize, + events_byte_size: 0, + }) } }