Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 42 additions & 3 deletions src/sinks/aws_kinesis/firehose/record.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord};
use aws_sdk_firehose::error::PutRecordBatchError;
use aws_sdk_firehose::output::PutRecordBatchOutput;
use aws_sdk_firehose::types::{Blob, SdkError};
use bytes::Bytes;
#[cfg(not(test))]
use tokio::time::{sleep, Duration};
use tracing::Instrument;

use super::{KinesisClient, KinesisError, KinesisRecord, Record, SendRecord};

#[derive(Clone)]
pub struct KinesisFirehoseRecord {
pub record: KinesisRecord,
Expand Down Expand Up @@ -47,13 +50,49 @@ impl SendRecord for KinesisFirehoseClient {
type E = KinesisError;

async fn send(&self, records: Vec<Self::T>, stream_name: String) -> Option<SdkError<Self::E>> {
let mut r = self.inner_send(records.clone(), stream_name.clone()).await;

for _ in 1..=3 {
if let Ok(resp) = &r {
if resp.failed_put_count().unwrap_or(0) > 0 {
#[cfg(not(test))] // the wait fails during test for some reason.
Copy link
Contributor Author

@jasongoodwin jasongoodwin Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of odd, but I noticed that the tokio sleep causes a test failure, even if it isn't executed.

sleep(Duration::from_millis(100)).await;
Copy link
Contributor Author

@jasongoodwin jasongoodwin Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't any configuration or backoff at all. Really, just an example at this point to hopefully get some eyes/input.


let mut failed_records = vec![];
let itr = records
.clone()
.into_iter()
.zip(resp.request_responses().unwrap().into_iter());
for (rec, response) in itr {
// TODO can just filter
if response.error_code().is_some() {
failed_records.push(rec.clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all kinds of cloning and unidiomatic rust - this is just an example for now.

}
}

r = self.inner_send(failed_records, stream_name.clone()).await;
} else {
return r.err();
}
}
}

return r.err();
}
}

impl KinesisFirehoseClient {
async fn inner_send(
&self,
records: Vec<KinesisRecord>,
stream_name: String,
) -> Result<PutRecordBatchOutput, SdkError<PutRecordBatchError>> {
self.client
.put_record_batch()
.set_records(Some(records))
.delivery_stream_name(stream_name)
.send()
.instrument(info_span!("request").or_current())
.await
.err()
}
}