diff --git a/lib/vector-common/src/finalizer.rs b/lib/vector-common/src/finalizer.rs index dbf8497e7a3ef..15bc52ab3510d 100644 --- a/lib/vector-common/src/finalizer.rs +++ b/lib/vector-common/src/finalizer.rs @@ -119,6 +119,13 @@ where // Drop all the existing status receivers and start over. status_receivers = S::default(); }, + // Prefer to remove finalizers than to add new finalizers to prevent unbounded + // growth under load. + finished = status_receivers.next(), if !status_receivers.is_empty() => match finished { + Some((status, entry)) => yield (status, entry), + // The `is_empty` guard above prevents this from being reachable. + None => unreachable!(), + }, // Only poll for new entries until shutdown is flagged. new_entry = new_entries.recv() => match new_entry { Some((receiver, entry)) => { @@ -130,11 +137,6 @@ where // The end of the new entry channel signals shutdown None => break, }, - finished = status_receivers.next(), if !status_receivers.is_empty() => match finished { - Some((status, entry)) => yield (status, entry), - // The `is_empty` guard above prevents this from being reachable. - None => unreachable!(), - }, } } diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 96c789cc66919..c9418baab5940 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -392,6 +392,7 @@ async fn kafka_source( loop { tokio::select! { + biased; _ = &mut shutdown => break, entry = ack_stream.next() => if let Some((status, entry)) = entry { if status == BatchStatus::Delivered {