Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions lib/vector-common/src/finalizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ where
// Drop all the existing status receivers and start over.
status_receivers = S::default();
},
// Prefer to remove finalizers than to add new finalizers to prevent unbounded
// growth under load.
finished = status_receivers.next(), if !status_receivers.is_empty() => match finished {
Some((status, entry)) => yield (status, entry),
// The `is_empty` guard above prevents this from being reachable.
None => unreachable!(),
},
// Only poll for new entries until shutdown is flagged.
new_entry = new_entries.recv() => match new_entry {
Some((receiver, entry)) => {
Expand All @@ -130,11 +137,6 @@ where
// The end of the new entry channel signals shutdown
None => break,
},
finished = status_receivers.next(), if !status_receivers.is_empty() => match finished {
Some((status, entry)) => yield (status, entry),
// The `is_empty` guard above prevents this from being reachable.
None => unreachable!(),
},
}
}

Expand Down
1 change: 1 addition & 0 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ async fn kafka_source(

loop {
tokio::select! {
biased;
_ = &mut shutdown => break,
entry = ack_stream.next() => if let Some((status, entry)) = entry {
if status == BatchStatus::Delivered {
Expand Down