From b67b0d3789430d8a04386d005d6816c674ccadff Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 16 Feb 2026 07:58:33 -0600 Subject: [PATCH 1/3] Add ISupportConsumerPause to RabbitMqListener Implements backpressure support for RabbitMQ when the durable inbox database is unavailable. The pause check is in WorkerQueueMessageConsumer's HandleBasicDeliverAsync to block message delivery while paused. Co-Authored-By: Claude Opus 4.6 --- .../Wolverine.RabbitMQ/Internal/RabbitMqListener.cs | 13 ++++++++++++- .../Internal/WorkerQueueMessageConsumer.cs | 5 +++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs index deaa21b27..52a7344ca 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs @@ -45,7 +45,7 @@ public async Task MoveToErrorsAsync(Envelope envelope, Exception exception) public bool NativeDeadLetterQueueEnabled => true; } -internal class RabbitMqListener : RabbitMqChannelAgent, IListener, ISupportDeadLetterQueue, ISupportMultipleConsumers +internal class RabbitMqListener : RabbitMqChannelAgent, IListener, ISupportDeadLetterQueue, ISupportMultipleConsumers, ISupportConsumerPause { private readonly IChannelCallback _callback; private readonly CancellationToken _cancellation = CancellationToken.None; @@ -56,6 +56,7 @@ internal class RabbitMqListener : RabbitMqChannelAgent, IListener, ISupportDeadL private readonly RabbitMqTransport _transport; private WorkerQueueMessageConsumer? _consumer; private string? _consumerId; + internal volatile bool Paused; public RabbitMqListener(IWolverineRuntime runtime, RabbitMqQueue queue, RabbitMqTransport transport, IReceiver receiver) : base( @@ -142,6 +143,16 @@ public Task MoveToErrorsAsync(Envelope envelope, Exception exception) public bool NativeDeadLetterQueueEnabled { get; } + public void PauseConsuming() + { + Paused = true; + } + + public void ResumeConsuming() + { + Paused = false; + } + public string? ConsumerId { get => _consumerId; diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs index ec8731177..0b1121eaa 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs @@ -42,6 +42,11 @@ public override async Task HandleBasicDeliverAsync(string consumerTag, ulong del return; } + while (_listener.Paused && !_cancellation.IsCancellationRequested) + { + await Task.Delay(250, _cancellation); + } + var envelope = new RabbitMqEnvelope(_listener, deliveryTag); try From 8224c1b313c096bdbeafe52e098c5f71d44bdfc9 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 16 Feb 2026 07:59:06 -0600 Subject: [PATCH 2/3] Add ISupportConsumerPause to RedisStreamListener Implements backpressure support for Redis streams when the durable inbox database is unavailable. Adds a spin-wait check in the ConsumerLoop before each read cycle. Co-Authored-By: Claude Opus 4.6 --- .../Internal/RedisStreamListener.cs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs index 076a55b6b..ba7203ae8 100644 --- a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs +++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs @@ -10,7 +10,7 @@ namespace Wolverine.Redis.Internal; -public class RedisStreamListener : IListener, ISupportDeadLetterQueue +public class RedisStreamListener : IListener, ISupportDeadLetterQueue, ISupportConsumerPause { private readonly RedisTransport _transport; private readonly RedisStreamEndpoint _endpoint; @@ -24,6 +24,7 @@ public class RedisStreamListener : IListener, ISupportDeadLetterQueue private Task? _scheduledTask; private ListeningStatus _status = ListeningStatus.Stopped; private string _consumerName; + private volatile bool _paused; @@ -48,6 +49,16 @@ public RedisStreamListener(RedisTransport transport, RedisStreamEndpoint endpoin public ListeningStatus Status => _status; public IHandlerPipeline? Pipeline => _receiver.Pipeline; + public void PauseConsuming() + { + _paused = true; + } + + public void ResumeConsuming() + { + _paused = false; + } + internal bool DeleteOnAck => _transport.DeleteStreamEntryOnAck; // ISupportDeadLetterQueue implementation @@ -325,6 +336,11 @@ private async Task ConsumerLoop() { while (!_cancellation.Token.IsCancellationRequested && _status == ListeningStatus.Accepting) { + while (_paused && !_cancellation.Token.IsCancellationRequested) + { + await Task.Delay(250, _cancellation.Token); + } + try { // Determine if it's time to use AutoClaim instead of regular read From 2b7c1e59084d57bffe9096375aafb4ee975aa3e3 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 16 Feb 2026 08:44:08 -0600 Subject: [PATCH 3/3] Fix durable inbox message loss by pausing listeners when inbox database is unavailable Replace per-listener ISupportConsumerPause approach with centralized coordination through ListeningAgent. When DurableReceiver detects inbox database failure, it signals the ListeningAgent to stop the listener and drain in-flight messages. A new InboxHealthRestarter probes the database with exponential backoff and restarts the listener when connectivity is restored. Also forces Kafka to disable auto-commit in durable mode to prevent offset advancement on unconsumed messages. Co-Authored-By: Claude Opus 4.6 --- .../Internals/KafkaListener.cs | 38 ++-------- .../Kafka/Wolverine.Kafka/KafkaTopic.cs | 7 ++ .../Internal/RabbitMqListener.cs | 13 +--- .../Internal/WorkerQueueMessageConsumer.cs | 5 -- .../Internal/RedisStreamListener.cs | 21 +----- .../Runtime/WorkerQueues/DurableReceiver.cs | 34 +++++++++ src/Wolverine/Transports/ListeningAgent.cs | 74 ++++++++++++++++++- 7 files changed, 123 insertions(+), 69 deletions(-) diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs index 3221bfdeb..9f02d9ded 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs @@ -16,9 +16,7 @@ public class KafkaListener : IListener, IDisposable, ISupportDeadLetterQueue private readonly Task _runner; private readonly IReceiver _receiver; private readonly string? _messageTypeName; - private readonly QualityOfService _qualityOfService; private readonly ILogger _logger; - public KafkaListener(KafkaTopic topic, ConsumerConfig config, IConsumer consumer, IReceiver receiver, ILogger logger) @@ -34,10 +32,6 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, Config = config; _receiver = receiver; - _qualityOfService = Config.EnableAutoCommit.HasValue && !Config.EnableAutoCommit.Value - ? QualityOfService.AtMostOnce - : QualityOfService.AtLeastOnce; - _runner = Task.Run(async () => { _consumer.Subscribe(topic.TopicName); @@ -45,21 +39,6 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, { while (!_cancellation.IsCancellationRequested) { - if (_qualityOfService == QualityOfService.AtMostOnce) - { - try - { - _consumer.Commit(); - } - catch (KafkaException e) - { - if (!e.Message.Contains("No offset stored")) - { - throw; - } - } - } - try { var result = _consumer.Consume(_cancellation.Token); @@ -87,7 +66,7 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, catch (Exception) { } - + logger.LogError(e, "Error trying to map Kafka message to a Wolverine envelope"); } } @@ -109,16 +88,13 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, public ValueTask CompleteAsync(Envelope envelope) { - if (_qualityOfService == QualityOfService.AtLeastOnce) + try + { + _consumer.Commit(); + } + catch (Exception) { - try - { - _consumer.Commit(); - } - catch (Exception) - { - } } return ValueTask.CompletedTask; } @@ -192,4 +168,4 @@ public void Dispose() _consumer.SafeDispose(); _runner.Dispose(); } -} \ No newline at end of file +} diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index 87e2edaa0..23469c58d 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -94,6 +94,13 @@ public override ValueTask BuildListenerAsync(IWolverineRuntime runtim EnvelopeMapper ??= BuildMapper(runtime); var config = GetEffectiveConsumerConfig(); + + if (Mode == EndpointMode.Durable) + { + config.EnableAutoCommit = false; + config.EnableAutoOffsetStore = false; + } + var listener = new KafkaListener(this, config, Parent.CreateConsumer(config), receiver, runtime.LoggerFactory.CreateLogger()); return ValueTask.FromResult((IListener)listener); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs index 52a7344ca..deaa21b27 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs @@ -45,7 +45,7 @@ public async Task MoveToErrorsAsync(Envelope envelope, Exception exception) public bool NativeDeadLetterQueueEnabled => true; } -internal class RabbitMqListener : RabbitMqChannelAgent, IListener, ISupportDeadLetterQueue, ISupportMultipleConsumers, ISupportConsumerPause +internal class RabbitMqListener : RabbitMqChannelAgent, IListener, ISupportDeadLetterQueue, ISupportMultipleConsumers { private readonly IChannelCallback _callback; private readonly CancellationToken _cancellation = CancellationToken.None; @@ -56,7 +56,6 @@ internal class RabbitMqListener : RabbitMqChannelAgent, IListener, ISupportDeadL private readonly RabbitMqTransport _transport; private WorkerQueueMessageConsumer? _consumer; private string? _consumerId; - internal volatile bool Paused; public RabbitMqListener(IWolverineRuntime runtime, RabbitMqQueue queue, RabbitMqTransport transport, IReceiver receiver) : base( @@ -143,16 +142,6 @@ public Task MoveToErrorsAsync(Envelope envelope, Exception exception) public bool NativeDeadLetterQueueEnabled { get; } - public void PauseConsuming() - { - Paused = true; - } - - public void ResumeConsuming() - { - Paused = false; - } - public string? ConsumerId { get => _consumerId; diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs index 0b1121eaa..ec8731177 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/WorkerQueueMessageConsumer.cs @@ -42,11 +42,6 @@ public override async Task HandleBasicDeliverAsync(string consumerTag, ulong del return; } - while (_listener.Paused && !_cancellation.IsCancellationRequested) - { - await Task.Delay(250, _cancellation); - } - var envelope = new RabbitMqEnvelope(_listener, deliveryTag); try diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs index ba7203ae8..fd4dafdb9 100644 --- a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs +++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs @@ -10,7 +10,7 @@ namespace Wolverine.Redis.Internal; -public class RedisStreamListener : IListener, ISupportDeadLetterQueue, ISupportConsumerPause +public class RedisStreamListener : IListener, ISupportDeadLetterQueue { private readonly RedisTransport _transport; private readonly RedisStreamEndpoint _endpoint; @@ -24,10 +24,6 @@ public class RedisStreamListener : IListener, ISupportDeadLetterQueue, ISupportC private Task? _scheduledTask; private ListeningStatus _status = ListeningStatus.Stopped; private string _consumerName; - private volatile bool _paused; - - - public RedisStreamListener(RedisTransport transport, RedisStreamEndpoint endpoint, IWolverineRuntime runtime, IReceiver receiver) { @@ -49,16 +45,6 @@ public RedisStreamListener(RedisTransport transport, RedisStreamEndpoint endpoin public ListeningStatus Status => _status; public IHandlerPipeline? Pipeline => _receiver.Pipeline; - public void PauseConsuming() - { - _paused = true; - } - - public void ResumeConsuming() - { - _paused = false; - } - internal bool DeleteOnAck => _transport.DeleteStreamEntryOnAck; // ISupportDeadLetterQueue implementation @@ -336,11 +322,6 @@ private async Task ConsumerLoop() { while (!_cancellation.Token.IsCancellationRequested && _status == ListeningStatus.Accepting) { - while (_paused && !_cancellation.Token.IsCancellationRequested) - { - await Task.Delay(250, _cancellation.Token); - } - try { // Determine if it's time to use AutoClaim instead of regular read diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 34694507d..07a23641d 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -29,15 +29,18 @@ public class DurableReceiver : ILocalQueue, IChannelCallback, ISupportNativeSche private readonly RetryBlock _moveToErrors; private readonly IBlock _receiver; private readonly RetryBlock _receivingOne; + private readonly IWolverineRuntime _runtime; private readonly RetryBlock _scheduleExecution; private readonly DurabilitySettings _settings; // These members are for draining private bool _latched; + private int _inboxUnavailableSignaled; public DurableReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPipeline pipeline) { _endpoint = endpoint; + _runtime = runtime; _settings = runtime.DurabilitySettings; // the check for Stores being null is honestly just because of some tests that use a little too much mocking @@ -321,6 +324,31 @@ public Task MoveToScheduledUntilAsync(Envelope envelope, DateTimeOffset time) return _scheduleExecution.PostAsync(envelope); } + internal void SignalInboxUnavailable() + { + if (Interlocked.CompareExchange(ref _inboxUnavailableSignaled, 1, 0) != 0) return; + + _logger.LogWarning("Inbox database unavailable for {Uri}. Signaling listener to pause.", Uri); + + // Fire-and-forget via Task.Run to avoid deadlock: + // We're on a RetryBlock thread; PauseForInboxRecoveryAsync drains that same RetryBlock. + _ = Task.Run(async () => + { + try + { + var agent = _runtime.Endpoints.FindListeningAgent(Uri); + if (agent is ListeningAgent la) + { + await la.PauseForInboxRecoveryAsync(); + } + } + catch (Exception e) + { + _logger.LogError(e, "Error signaling listener pause for inbox recovery at {Uri}", Uri); + } + }); + } + private async Task receiveOneAsync(Envelope envelope) { if (_latched) @@ -390,6 +418,11 @@ await executeWithRetriesAsync(async () => return; } + catch (Exception) + { + SignalInboxUnavailable(); + throw; + } } if (envelope.Status == EnvelopeStatus.Incoming) @@ -468,6 +501,7 @@ public async ValueTask ProcessReceivedMessagesAsync(DateTimeOffset now, IListene catch (Exception e) { _logger.LogError(e, "Error trying to persist incoming envelopes at {Uri}", Uri); + SignalInboxUnavailable(); // Use finer grained retries on one envelope at a time, and this will also deal with // duplicate detection diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index cd479a6ce..d824ca1c4 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -36,7 +36,7 @@ public class ListeningAgent : IAsyncDisposable, IDisposable, IListeningAgent private readonly HandlerPipeline _pipeline; private readonly IWolverineRuntime _runtime; private IReceiver? _receiver; - private Restarter? _restarter; + private IDisposable? _restarter; public ListeningAgent(Endpoint endpoint, WolverineRuntime runtime) { @@ -229,6 +229,33 @@ public async ValueTask PauseAsync(TimeSpan pauseTime) private readonly SemaphoreSlim _semaphore = new(1, 1); + public async ValueTask PauseForInboxRecoveryAsync() + { + if (Status != ListeningStatus.Accepting || Listener == null) return; + + await _semaphore.WaitAsync(); + if (Status != ListeningStatus.Accepting || Listener == null) + { + _semaphore.Release(); + return; + } + + try + { + await StopAndDrainAsync(); + _circuitBreaker?.Reset(); + _logger.LogWarning("Paused listener at {Uri} — inbox database unavailable", Uri); + _runtime.Tracker.Publish(new ListenerState(Uri, Endpoint.EndpointName, ListeningStatus.Stopped)); + + _restarter?.SafeDispose(); + _restarter = new InboxHealthRestarter(this, _runtime, _logger); + } + finally + { + _semaphore.Release(); + } + } + public async ValueTask MarkAsTooBusyAndStopReceivingAsync() { if (Status != ListeningStatus.Accepting || Listener == null) @@ -320,6 +347,51 @@ public void Dispose() } } +internal class InboxHealthRestarter : IDisposable +{ + private readonly CancellationTokenSource _cancellation = new(); + private readonly Task _task; + + public InboxHealthRestarter(IListenerCircuit parent, IWolverineRuntime runtime, ILogger logger) + { + _task = Task.Run(() => ProbeLoopAsync(parent, runtime, logger, _cancellation.Token)); + } + + private static async Task ProbeLoopAsync( + IListenerCircuit parent, IWolverineRuntime runtime, ILogger logger, CancellationToken ct) + { + var delay = TimeSpan.FromSeconds(2); + var maxDelay = TimeSpan.FromSeconds(30); + + while (!ct.IsCancellationRequested) + { + try { await Task.Delay(delay, ct); } + catch (OperationCanceledException) { return; } + + try + { + // Lightweight probe — releases 0 rows but exercises DB connection + await runtime.Storage.Inbox.ReleaseIncomingAsync(0, new Uri("wolverine://inbox-health-probe")); + + logger.LogInformation("Inbox available again for {Uri}. Restarting listener.", parent.Endpoint.Uri); + await parent.StartAsync(); + return; + } + catch (Exception e) + { + logger.LogWarning(e, "Inbox still unavailable for {Uri}. Retrying in {Delay}.", parent.Endpoint.Uri, delay); + delay = TimeSpan.FromMilliseconds(Math.Min(delay.TotalMilliseconds * 1.5, maxDelay.TotalMilliseconds)); + } + } + } + + public void Dispose() + { + _cancellation.Cancel(); + _task.SafeDispose(); + } +} + internal class RetryOnInlineChannelCallback : IListener { private readonly IListener _inner;