From e5eb21c54471aed12687885a0420834d5a7994a4 Mon Sep 17 00:00:00 2001 From: Spencer Gilbert Date: Fri, 12 May 2023 17:44:25 -0400 Subject: [PATCH 1/3] fix(kafka source): Bias the tokio::select --- src/sources/kafka.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index 96c789cc66919..c9418baab5940 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -392,6 +392,7 @@ async fn kafka_source( loop { tokio::select! { + biased; _ = &mut shutdown => break, entry = ack_stream.next() => if let Some((status, entry)) = entry { if status == BatchStatus::Delivered { From a273ff786de2be5ee52d25fba13e3ff8b328a3c3 Mon Sep 17 00:00:00 2001 From: Spencer Gilbert Date: Fri, 12 May 2023 17:45:16 -0400 Subject: [PATCH 2/3] chore: Reorder finalizer tokio::select to remove existing finalizers before adding new finalizers --- lib/vector-common/src/finalizer.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/vector-common/src/finalizer.rs b/lib/vector-common/src/finalizer.rs index dbf8497e7a3ef..b91325a7e39d0 100644 --- a/lib/vector-common/src/finalizer.rs +++ b/lib/vector-common/src/finalizer.rs @@ -119,6 +119,12 @@ where // Drop all the existing status receivers and start over. status_receivers = S::default(); }, + // Prefer to remove finalizers than to add new finalizers. + finished = status_receivers.next(), if !status_receivers.is_empty() => match finished { + Some((status, entry)) => yield (status, entry), + // The `is_empty` guard above prevents this from being reachable. + None => unreachable!(), + }, // Only poll for new entries until shutdown is flagged. new_entry = new_entries.recv() => match new_entry { Some((receiver, entry)) => { @@ -130,11 +136,6 @@ where // The end of the new entry channel signals shutdown None => break, }, - finished = status_receivers.next(), if !status_receivers.is_empty() => match finished { - Some((status, entry)) => yield (status, entry), - // The `is_empty` guard above prevents this from being reachable. - None => unreachable!(), - }, } } From deda7805aca4d66cae36f4bbfcfa0131c2187908 Mon Sep 17 00:00:00 2001 From: Spencer Gilbert Date: Mon, 15 May 2023 10:56:23 -0400 Subject: [PATCH 3/3] +pr feedback on comment Signed-off-by: Spencer Gilbert --- lib/vector-common/src/finalizer.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/vector-common/src/finalizer.rs b/lib/vector-common/src/finalizer.rs index b91325a7e39d0..15bc52ab3510d 100644 --- a/lib/vector-common/src/finalizer.rs +++ b/lib/vector-common/src/finalizer.rs @@ -119,7 +119,8 @@ where // Drop all the existing status receivers and start over. status_receivers = S::default(); }, - // Prefer to remove finalizers than to add new finalizers. + // Prefer to remove finalizers than to add new finalizers to prevent unbounded + // growth under load. finished = status_receivers.next(), if !status_receivers.is_empty() => match finished { Some((status, entry)) => yield (status, entry), // The `is_empty` guard above prevents this from being reachable.