Skip to content

fix(#3076): Kafka sender enqueues batch before awaiting deliveries#3078

Merged
jeremydmiller merged 1 commit into
mainfrom
fix-3076-kafka-batch-throughput
Jun 11, 2026
Merged

fix(#3076): Kafka sender enqueues batch before awaiting deliveries#3078
jeremydmiller merged 1 commit into
mainfrom
fix-3076-kafka-batch-throughput

Conversation

@jeremydmiller

Copy link
Copy Markdown
Member

Closes #3076.

KafkaSenderProtocol.SendBatchAsync awaited each ProduceAsync inline, which forced a broker round-trip per message and effectively serialized throughput. Under a durable outbox driving 100k+ messages the serialized awaits dwarfed the on-broker work the producer settings (linger.ms, batch.size, compression) were tuned for.

The change

Two phases:

  1. Map every envelope to a Confluent Message<>. A mapping or serialization failure on any envelope before anything is on the wire is reported as MarkSerializationFailureAsync on the whole batch — covers the long-standing TODO -- separate try/catch here! marker the original loop carried.

  2. Enqueue every produce without awaiting, then Task.WhenAll on the captured produce tasks. The Confluent producer's internal accumulator can now fill from the full batch and emit a single ProduceRequest, so linger.ms + batch.size + compression do the coalescing the reporter expected.

Batch-level semantics stay all-or-nothing to fit ISenderCallback's batch-shaped API: every send acked → MarkSuccessfulAsync; any failure → MarkProcessingFailureAsync (the durable outbox layer retries the batch, receive-side idempotency dedupes). Per-envelope partial-success bookkeeping would need a callback-shape change that's out of scope for this fix — matches the choice the SQS sender already made (see SqsSenderProtocol.SendBatchAsync).

Scope

Only the BatchedSender path — EndpointMode.Buffered and EndpointMode.Durable. EndpointMode.Inline still uses InlineKafkaSender, which is unchanged.

// KafkaTopic.CreateSender
return Mode == EndpointMode.Inline
    ? new InlineKafkaSender(this)
    : new BatchedSender(this, new KafkaSenderProtocol(this), ...);

Test status

Ran the full Wolverine.Kafka.Tests suite (160 tests, ~15 min) against a local Confluent container with this fix and against clean main for baseline.

  • The full sweep is timing-sensitive against a single local Kafka container — both runs surfaced compliance-test timeouts that are intermittent.
  • For every "new failure" the with-fix sweep produced, I re-ran the test in isolation and either it passed under the fix or it also failed on clean main (e.g. BufferedSendingAndReceivingCompliance.can_schedule_retry).
  • Most notably, InlineSendingAndReceivingCompliance.* cannot regress from this PR by construction (different code path), yet some of those still appeared in the noise — proving the failures are local-container load artefacts, not real regressions.

So the change is safe to merge from the test perspective. The expected user-visible impact for the reporter is a large throughput uplift on durable-outbox / buffered Kafka publishing once their producer batching settings (linger.ms, batch.size, compression) finally have a populated accumulator to coalesce over.

🤖 Generated with Claude Code

KafkaSenderProtocol.SendBatchAsync awaited each ProduceAsync inline, which
forced a broker round-trip per message and effectively serialized
throughput. Under a durable outbox driving 100k+ messages the serialized
awaits dwarfed the on-broker work the producer settings (linger.ms,
batch.size, compression) were tuned for.

Refactor to two phases:

  1. Map every envelope to a Confluent Message<>. A mapping or
     serialization failure on any envelope before we've put anything on
     the wire is reported as MarkSerializationFailureAsync on the whole
     batch — covers the long-standing 'TODO -- separate try/catch here!'
     marker the original loop carried.

  2. Enqueue every produce without awaiting, then await Task.WhenAll on
     the captured produce tasks. The Confluent producer's internal
     accumulator can now fill from the full batch and emit a single
     ProduceRequest, so linger.ms + batch.size + compression do the
     coalescing the reporter expected.

Batch-level semantics stay all-or-nothing to fit ISenderCallback's
batch-shaped API: every send acked → MarkSuccessfulAsync; any failure →
MarkProcessingFailureAsync (the durable outbox layer retries the batch,
receive-side idempotency dedupes). Per-envelope partial-success
bookkeeping would need a callback-shape change that's out of scope for
this fix — matches the choice the SQS sender already made.

Scope: only the Buffered + Durable endpoints (the BatchedSender path).
EndpointMode.Inline still uses InlineKafkaSender, which is unchanged.

Ran the full Kafka test suite against a local Confluent container. The
delta vs main is dominated by pre-existing flakes in the 15-min compliance
sweep against a single-container broker — re-running the suspected new
failures in isolation either passes them under the fix or reproduces the
same failure on clean main (e.g. can_schedule_retry), confirming no new
real regressions.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Kafka sender awaits each message sequentially inside outgoing batches

1 participant