Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
156bed4
chore(deps): update dependency tunit to 1.14.0
renovate-bot Feb 15, 2026
80d2dce
chore(deps): update dependency tunit to 1.14.0
renovate-bot Feb 15, 2026
8e0a1cd
Merge branch 'renovate/tunit' of https://github.com/thomhurst/Dekaf i…
thomhurst Feb 15, 2026
c752b02
feat: enhance CI workflow with HangDump artifact upload and timeout a…
thomhurst Feb 15, 2026
4aeb234
fix: prevent ProduceAsync hang when BrokerSender send loop exits
thomhurst Feb 15, 2026
a203df2
fix: complete BrokerSender channel when send loop exits
thomhurst Feb 15, 2026
17a5620
fix: make BrokerSender send loop resilient to transient errors
thomhurst Feb 15, 2026
0f5703b
fix: serialize ResponseParsingContext tests to prevent thread-local s…
thomhurst Feb 15, 2026
a4266a7
fix: prevent batch loss and enforce delivery deadline in carry-over
thomhurst Feb 15, 2026
44186bf
fix: prevent batch loss and enforce delivery deadline in error recovery
thomhurst Feb 15, 2026
d5d8b57
refactor: CancellationTokenSourcePool with auto-return via Dispose
thomhurst Feb 15, 2026
72ff16f
fix: prevent self-perpetuating NRE in ProcessCompletedResponses
thomhurst Feb 15, 2026
fcde049
fix: prevent indefinite hang when carry-over batches are all muted
thomhurst Feb 15, 2026
8bffbe5
fix: guard PooledCancellationTokenSource against double-dispose
thomhurst Feb 15, 2026
1022f7f
revert: remove production code changes, keep only TUnit upgrade
thomhurst Feb 15, 2026
a1e64c2
fix: prevent channel starvation and carry-over deadline leaks in Brok…
thomhurst Feb 15, 2026
e5ecc22
fix: restore per-test timeout for integration tests
thomhurst Feb 15, 2026
324ed58
fix(ci): remove --timeout 10m from integration tests pipeline
thomhurst Feb 15, 2026
0169229
fix(tests): limit parallelism for RealWorld messaging tests
thomhurst Feb 15, 2026
31239e5
fix(tests): add 2-minute timeout to FanOutPatternTests to prevent CI …
thomhurst Feb 15, 2026
95b707e
fix(ci): handle TUnit 1.14+ process exit hang in pipeline
thomhurst Feb 16, 2026
bf9fff8
fix(tests): assert pooled buffer data before returning to pool
thomhurst Feb 16, 2026
1a0010e
fix: complete BrokerSender channel on send loop exit to prevent produ…
thomhurst Feb 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ jobs:
if-no-files-found: ignore
retention-days: 30

- name: Upload HangDump artifacts
uses: actions/upload-artifact@v6
if: always()
with:
name: hangdump-artifacts-${{ matrix.category }}
path: |
**/*.dmp
if-no-files-found: ignore
retention-days: 7

integration-tests:
name: Integration Tests (${{ matrix.category }})
needs: build-and-unit-test
Expand Down
138 changes: 120 additions & 18 deletions src/Dekaf/Producer/BrokerSender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ public BrokerSender(
_sendLoopTask = SendLoopAsync(_cts.Token);
}

/// <summary>
/// Returns true if the send loop is still running. When false, this BrokerSender
/// should be replaced — its send loop has exited and it can no longer process batches.
/// </summary>
internal bool IsAlive => !_sendLoopTask.IsCompleted;

/// <summary>
/// Enqueues a batch for sending to this broker.
/// Fast path: TryWrite succeeds when the bounded channel has capacity.
Expand Down Expand Up @@ -228,15 +234,11 @@ public void Enqueue(ReadyBatch batch)
_ = _batchChannel.Writer.WriteAsync(batch, _cts.Token).AsTask().ContinueWith(
static (task, state) =>
{
if (task.IsFaulted || task.IsCanceled)
{
var b = (ReadyBatch)state!;
// Can't use FailEnqueuedBatch (instance method), inline the cleanup
try { b.Fail(task.Exception?.InnerException ?? new OperationCanceledException()); }
catch { /* Observe */ }
}
var (sender, b) = ((BrokerSender, ReadyBatch))state!;
try { sender.FailEnqueuedBatch(b); }
catch { /* Observe - disposal may have already cleaned up */ }
},
batch,
(this, batch),
CancellationToken.None,
TaskContinuationOptions.NotOnRanToCompletion,
TaskScheduler.Default);
Expand Down Expand Up @@ -414,22 +416,30 @@ await _bumpEpoch((short)staleEpoch, cancellationToken)
// 1. No carry-over: normal fast path — read freely for maximum throughput.
// 2. Carry-over was all muted (coalescedCount==0): read to find sendable batches
// — this prevents the starvation livelock.
// When carry-over produced a coalesced batch, skip channel reads to prevent
// carry-over growth. Carry-over drains by 1 per iteration; reading more from
// the channel (duplicate-partition for single-partition workloads) would cause
// unbounded growth and O(n²) scanning.
// When carry-over produced a coalesced batch, read at most 1 from channel
// to prevent carry-over growth while still draining the channel gradually.
// Without this limit, duplicate-partition batches (single-partition workloads)
// would cause unbounded carry-over growth and O(n²) scanning.
// With the limit of 1, carry-over growth is bounded by channel capacity
// (MaxInFlightRequestsPerConnection × 2) and drains naturally.
// No sorting needed — retry batches no longer come through the channel.
if (!hadCarryOver || coalescedCount == 0)
{
var channelReadLimit = (hadCarryOver && coalescedCount > 0) ? 1 : maxCoalesce;
var channelReads = 0;
while (channelReads < maxCoalesce && channelReader.TryRead(out var channelBatch))
while (channelReads < channelReadLimit && channelReader.TryRead(out var channelBatch))
{
channelReads++;
CoalesceBatch(channelBatch, coalescedBatches, ref coalescedCount,
coalescedPartitions, newCarryOver);
}
}

// Sweep carry-over for expired batches. This prevents muted batches
// from sitting indefinitely while their partition's retry cycles, and
// ensures channel batches that were read above are deadline-checked.
if (newCarryOver.Count > 0)
SweepExpiredCarryOver(newCarryOver);

// Send or wait
if (coalescedCount > 0)
{
Expand Down Expand Up @@ -501,26 +511,47 @@ await SendCoalescedAsync(
reusableWaitTasks.Add(responseSignal.Task);
}

// Calculate earliest backoff from carry-over
// Calculate earliest backoff and delivery deadline from carry-over
if (newCarryOver.Count > 0)
{
var earliestBackoff = long.MaxValue;
var earliestDeadlineTicks = long.MaxValue;
var now = Stopwatch.GetTimestamp();

for (var i = 0; i < newCarryOver.Count; i++)
{
if (newCarryOver[i].RetryNotBefore > 0 && newCarryOver[i].RetryNotBefore < earliestBackoff)
earliestBackoff = newCarryOver[i].RetryNotBefore;

var deadlineTicks = newCarryOver[i].StopwatchCreatedTicks +
(long)(_options.DeliveryTimeoutMs * (Stopwatch.Frequency / 1000.0));
if (deadlineTicks < earliestDeadlineTicks)
earliestDeadlineTicks = deadlineTicks;
}

if (earliestBackoff < long.MaxValue)
{
var delayTicks = earliestBackoff - Stopwatch.GetTimestamp();
var delayTicks = earliestBackoff - now;
if (delayTicks > 0)
{
var delayMs = (int)(delayTicks * 1000.0 / Stopwatch.Frequency);
reusableWaitTasks.Add(Task.Delay(Math.Max(1, delayMs), cancellationToken));
}
// else: backoff already elapsed, will be processed next iteration
}

// Delivery deadline timer — ensures the loop wakes to expire
// timed-out batches even when no other signals fire.
if (earliestDeadlineTicks < long.MaxValue)
{
var delayTicks = earliestDeadlineTicks - now;
if (delayTicks > 0)
{
var delayMs = (int)(delayTicks * 1000.0 / Stopwatch.Frequency);
reusableWaitTasks.Add(Task.Delay(Math.Max(1, delayMs), cancellationToken));
}
// else: deadline already passed, will be swept next iteration
}
}

await Task.WhenAny(reusableWaitTasks).ConfigureAwait(false);
Expand All @@ -544,6 +575,41 @@ await SendCoalescedAsync(
}
finally
{
// Complete the channel FIRST to prevent KafkaProducer.SenderLoopAsync from
// writing new batches after we drain. Without this, EnqueueAsync blocks forever
// on the bounded channel because nobody is reading from it, causing producer hangs.
_batchChannel.Writer.TryComplete();

// Fail batches awaiting retry (set by SendCoalescedAsync catch blocks).
// Without this cleanup, completion sources in these batches are never resolved.
for (var i = 0; i < _sendFailedRetries.Count; i++)
{
CompleteInflightEntry(_sendFailedRetries[i]);
try { _sendFailedRetries[i].Fail(new ObjectDisposedException(nameof(BrokerSender))); }
catch { /* Observe */ }
CleanupBatch(_sendFailedRetries[i]);
}
_sendFailedRetries.Clear();

// Fail pending responses — the send loop won't process them anymore.
// Batches in pending responses have completion sources that callers are awaiting.
for (var i = 0; i < _pendingResponses.Count; i++)
{
var pr = _pendingResponses[i];
for (var j = 0; j < pr.Count; j++)
{
if (pr.Batches[j] is not null)
{
CompleteInflightEntry(pr.Batches[j]);
try { pr.Batches[j].Fail(new ObjectDisposedException(nameof(BrokerSender))); }
catch { /* Observe */ }
CleanupBatch(pr.Batches[j]);
}
}
ArrayPool<ReadyBatch>.Shared.Return(pr.Batches, clearArray: true);
}
_pendingResponses.Clear();

// Fail any carry-over batches that couldn't be sent.
// Drain both swappable lists — if an exception occurred mid-iteration,
// batches may be in either list depending on timing.
Expand Down Expand Up @@ -1240,6 +1306,41 @@ private void ReleaseInFlightSlot()
Interlocked.Exchange(ref _inFlightSlotAvailable, null)?.TrySetResult();
}

/// <summary>
/// Sweeps carry-over for batches that have exceeded their delivery deadline.
/// Prevents muted batches from sitting indefinitely while their partition's retry cycles.
/// Called from the single-threaded send loop after coalescing.
/// </summary>
private void SweepExpiredCarryOver(List<ReadyBatch> carryOver)
{
var now = Stopwatch.GetTimestamp();
for (var i = carryOver.Count - 1; i >= 0; i--)
{
var batch = carryOver[i];
var deliveryDeadlineTicks = batch.StopwatchCreatedTicks +
(long)(_options.DeliveryTimeoutMs * (Stopwatch.Frequency / 1000.0));

if (now >= deliveryDeadlineTicks)
{
// Unmute partition for retry batches (they caused the mute).
// Non-retry muted batches: don't unmute — the retry batch for this
// partition may still be in play and will unmute on its own expiry.
if (batch.IsRetry)
{
batch.IsRetry = false;
batch.RetryNotBefore = 0;
UnmutePartition(batch.TopicPartition);
}

LogDeliveryTimeoutExceeded(_brokerId, batch.TopicPartition.Topic,
batch.TopicPartition.Partition);
FailAndCleanupBatch(batch, new TimeoutException(
$"Delivery timeout exceeded for {batch.TopicPartition}"));
carryOver.RemoveAt(i);
}
}
}

private void FailCarryOverBatches(List<ReadyBatch> carryOver)
{
for (var i = 0; i < carryOver.Count; i++)
Expand Down Expand Up @@ -1290,8 +1391,9 @@ public async ValueTask DisposeAsync()

LogDisposing(_brokerId);

// Complete channel — send loop will see channel completed and exit
_batchChannel.Writer.Complete();
// Complete channel — send loop will see channel completed and exit.
// Use TryComplete: the send loop's finally block may have already completed it.
_batchChannel.Writer.TryComplete();

// Cancel CTS FIRST so WaitToReadAsync is interrupted promptly.
await _cts.CancelAsync().ConfigureAwait(false);
Expand Down
37 changes: 34 additions & 3 deletions src/Dekaf/Producer/KafkaProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2483,15 +2483,43 @@ private void CompleteInflightEntry(ReadyBatch batch)
/// <summary>
/// Gets or creates a BrokerSender for the given broker ID.
/// Each broker gets a dedicated sender with its own channel and single-threaded send loop.
/// If the existing BrokerSender's send loop has exited, replaces it with a fresh one.
/// </summary>
private BrokerSender GetOrCreateBrokerSender(int brokerId)
{
var sender = _brokerSenders.GetOrAdd(brokerId, CreateBrokerSender);

if (sender.IsAlive)
return sender;

// Send loop exited — replace with a fresh BrokerSender.
// This handles transient connection errors that killed the send loop.
LogBrokerSenderReplaced(brokerId);
var replacement = CreateBrokerSender(brokerId);
if (_brokerSenders.TryUpdate(brokerId, replacement, sender))
{
// Dispose old sender asynchronously (its finally block already cleaned up).
_ = sender.DisposeAsync().AsTask().ContinueWith(static (t, _) =>
{
// Observe any disposal exceptions to prevent UnobservedTaskException
_ = t.Exception;
}, null, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
return replacement;
}

// Another thread replaced it concurrently — dispose ours, use theirs
_ = replacement.DisposeAsync();
return _brokerSenders.GetOrAdd(brokerId, CreateBrokerSender);
}

private BrokerSender CreateBrokerSender(int brokerId)
{
// Epoch bump recovery is only for non-transactional producers.
// Transactional producers manage epochs via InitTransactionsAsync.
var isNonTransactional = _options.TransactionalId is null;

return _brokerSenders.GetOrAdd(brokerId, id => new BrokerSender(
id,
return new BrokerSender(
brokerId,
_connectionPool,
_metadataManager,
_accumulator,
Expand All @@ -2507,7 +2535,7 @@ private BrokerSender GetOrCreateBrokerSender(int brokerId)
getCurrentEpoch: isNonTransactional ? () => _producerEpoch : null,
RerouteBatchToCurrentLeader,
_interceptors is not null ? InvokeOnAcknowledgementForBatch : null,
_logger));
_logger);
}

/// <summary>
Expand Down Expand Up @@ -3156,6 +3184,9 @@ await _senderTask
[LoggerMessage(Level = LogLevel.Error, Message = "Failed to dispose broker sender")]
private partial void LogDisposeBrokerSenderFailed(Exception ex);

[LoggerMessage(Level = LogLevel.Warning, Message = "BrokerSender for broker {BrokerId} send loop exited — replacing with fresh sender")]
private partial void LogBrokerSenderReplaced(int brokerId);

[LoggerMessage(Level = LogLevel.Trace, Message = "Batch routed: {Topic}-{Partition} -> broker {BrokerId}")]
private partial void LogBatchRouted(string topic, int partition, int brokerId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<PackageReference Include="Google.Protobuf" Version="3.*" />
<PackageReference Include="Grpc.Tools" Version="2.*" PrivateAssets="All" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="10.0.0-preview.*" />
<PackageReference Include="TUnit" Version="1.13.60" />
<PackageReference Include="TUnit" Version="1.14.0" />
<PackageReference Include="Testcontainers.Kafka" Version="*" />
<PackageReference Include="Microsoft.Testing.Extensions.HangDump" Version="*" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace Dekaf.Tests.Integration.RealWorld;
/// These simulate real workflows: consume from input, transform, produce to output.
/// </summary>
[Category("Messaging")]
[ParallelLimiter<RealWorldMessagingLimit>]
public sealed class EventPipelineTests(KafkaTestContainer kafka) : KafkaIntegrationTest(kafka)
{
[Test]
Expand Down
12 changes: 8 additions & 4 deletions tests/Dekaf.Tests.Integration/RealWorld/FanOutPatternTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using Dekaf.Consumer;
using Dekaf.Producer;

#pragma warning disable CA2016 // Timeout cancellation token is a safety net; internal operations have their own timeouts

namespace Dekaf.Tests.Integration.RealWorld;

/// <summary>
Expand All @@ -9,10 +11,12 @@ namespace Dekaf.Tests.Integration.RealWorld;
/// Common in event-driven architectures where different services need the same data.
/// </summary>
[Category("Messaging")]
[ParallelLimiter<RealWorldMessagingLimit>]
[Timeout(120_000)] // 2 minutes — prevents individual test hangs from blocking CI
public sealed class FanOutPatternTests(KafkaTestContainer kafka) : KafkaIntegrationTest(kafka)
{
[Test]
public async Task FanOut_MultipleConsumerGroups_EachReceivesAllMessages()
public async Task FanOut_MultipleConsumerGroups_EachReceivesAllMessages(CancellationToken cancellationToken)
{
// Simulate: order-events consumed by billing, shipping, and notification services
var topic = await KafkaContainer.CreateTestTopicAsync();
Expand Down Expand Up @@ -56,7 +60,7 @@ await producer.ProduceAsync(new ProducerMessage<string, string>
}

[Test]
public async Task FanOut_ConsumerGroupsAtDifferentSpeeds_IndependentProgress()
public async Task FanOut_ConsumerGroupsAtDifferentSpeeds_IndependentProgress(CancellationToken cancellationToken)
{
// Fast consumer reads all, slow consumer reads partial - they don't affect each other
var topic = await KafkaContainer.CreateTestTopicAsync();
Expand Down Expand Up @@ -131,7 +135,7 @@ await producer.ProduceAsync(new ProducerMessage<string, string>
}

[Test]
public async Task FanOut_NewConsumerGroupJoinsLate_GetsAllHistoricalMessages()
public async Task FanOut_NewConsumerGroupJoinsLate_GetsAllHistoricalMessages(CancellationToken cancellationToken)
{
// A new service joins after events have already been produced
var topic = await KafkaContainer.CreateTestTopicAsync();
Expand Down Expand Up @@ -177,7 +181,7 @@ await producer.ProduceAsync(new ProducerMessage<string, string>
}

[Test]
public async Task FanOut_ConcurrentConsumerGroups_AllConsumeSimultaneously()
public async Task FanOut_ConcurrentConsumerGroups_AllConsumeSimultaneously(CancellationToken cancellationToken)
{
// Multiple consumer groups consuming the same topic at the same time
var topic = await KafkaContainer.CreateTestTopicAsync(partitions: 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace Dekaf.Tests.Integration.RealWorld;
/// and that key-based partitioning consistently routes to the same partition.
/// </summary>
[Category("Messaging")]
[ParallelLimiter<RealWorldMessagingLimit>]
public sealed class MessageOrderingTests(KafkaTestContainer kafka) : KafkaIntegrationTest(kafka)
{
[Test]
Expand Down
13 changes: 13 additions & 0 deletions tests/Dekaf.Tests.Integration/RealWorld/RealWorldMessagingLimit.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using TUnit.Core.Interfaces;

namespace Dekaf.Tests.Integration.RealWorld;

/// <summary>
/// Limits parallelism for RealWorld messaging tests (MessageOrdering, EventPipeline, FanOut).
/// These tests create multiple concurrent producers and consumers against shared Kafka containers,
/// which can overwhelm the broker under high parallelism on CI runners.
/// </summary>
public class RealWorldMessagingLimit : IParallelLimit
{
public int Limit => 3;
}
2 changes: 1 addition & 1 deletion tests/Dekaf.Tests.Unit/Dekaf.Tests.Unit.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="*" />
<PackageReference Include="Microsoft.Testing.Extensions.HangDump" Version="*" />
<PackageReference Include="NSubstitute" Version="5.*" />
<PackageReference Include="TUnit" Version="1.13.60" />
<PackageReference Include="TUnit" Version="1.14.0" />
<PackageReference Include="Verify.TUnit" Version="*" />
</ItemGroup>

Expand Down
Loading
Loading