diff --git a/src/sinks/aws_kinesis/firehose/record.rs b/src/sinks/aws_kinesis/firehose/record.rs index 49d1ee821f5c3..5128c278a882c 100644 --- a/src/sinks/aws_kinesis/firehose/record.rs +++ b/src/sinks/aws_kinesis/firehose/record.rs @@ -1,9 +1,12 @@ +use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord}; +use aws_sdk_firehose::error::PutRecordBatchError; +use aws_sdk_firehose::output::PutRecordBatchOutput; use aws_sdk_firehose::types::{Blob, SdkError}; use bytes::Bytes; +#[cfg(not(test))] +use tokio::time::{sleep, Duration}; use tracing::Instrument; -use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord}; - #[derive(Clone)] pub struct KinesisFirehoseRecord { pub record: KinesisRecord, @@ -47,6 +50,43 @@ impl SendRecord for KinesisFirehoseClient { type E = KinesisError; async fn send(&self, records: Vec, stream_name: String) -> Option> { + let mut r = self.inner_send(records.clone(), stream_name.clone()).await; + + for _ in 1..=3 { + if let Ok(resp) = &r { + if resp.failed_put_count().unwrap_or(0) > 0 { + #[cfg(not(test))] // the wait fails during test for some reason. + sleep(Duration::from_millis(100)).await; + + let mut failed_records = vec![]; + let itr = records + .clone() + .into_iter() + .zip(resp.request_responses().unwrap().into_iter()); + for (rec, response) in itr { + // TODO can just filter + if response.error_code().is_some() { + failed_records.push(rec.clone()); + } + } + + r = self.inner_send(failed_records, stream_name.clone()).await; + } else { + return r.err(); + } + } + } + + return r.err(); + } +} + +impl KinesisFirehoseClient { + async fn inner_send( + &self, + records: Vec, + stream_name: String, + ) -> Result> { self.client .put_record_batch() .set_records(Some(records)) @@ -54,6 +94,5 @@ impl SendRecord for KinesisFirehoseClient { .send() .instrument(info_span!("request").or_current()) .await - .err() } }