diff --git a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs index 3221bfdeb..9f02d9ded 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs @@ -16,9 +16,7 @@ public class KafkaListener : IListener, IDisposable, ISupportDeadLetterQueue private readonly Task _runner; private readonly IReceiver _receiver; private readonly string? _messageTypeName; - private readonly QualityOfService _qualityOfService; private readonly ILogger _logger; - public KafkaListener(KafkaTopic topic, ConsumerConfig config, IConsumer consumer, IReceiver receiver, ILogger logger) @@ -34,10 +32,6 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, Config = config; _receiver = receiver; - _qualityOfService = Config.EnableAutoCommit.HasValue && !Config.EnableAutoCommit.Value - ? QualityOfService.AtMostOnce - : QualityOfService.AtLeastOnce; - _runner = Task.Run(async () => { _consumer.Subscribe(topic.TopicName); @@ -45,21 +39,6 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, { while (!_cancellation.IsCancellationRequested) { - if (_qualityOfService == QualityOfService.AtMostOnce) - { - try - { - _consumer.Commit(); - } - catch (KafkaException e) - { - if (!e.Message.Contains("No offset stored")) - { - throw; - } - } - } - try { var result = _consumer.Consume(_cancellation.Token); @@ -87,7 +66,7 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, catch (Exception) { } - + logger.LogError(e, "Error trying to map Kafka message to a Wolverine envelope"); } } @@ -109,16 +88,13 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config, public ValueTask CompleteAsync(Envelope envelope) { - if (_qualityOfService == QualityOfService.AtLeastOnce) + try + { + _consumer.Commit(); + } + catch (Exception) { - try - { - _consumer.Commit(); - } - catch (Exception) - { - } } return ValueTask.CompletedTask; } @@ -192,4 +168,4 @@ public void Dispose() _consumer.SafeDispose(); _runner.Dispose(); } -} \ No newline at end of file +} diff --git a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs index 87e2edaa0..23469c58d 100644 --- a/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs +++ b/src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs @@ -94,6 +94,13 @@ public override ValueTask BuildListenerAsync(IWolverineRuntime runtim EnvelopeMapper ??= BuildMapper(runtime); var config = GetEffectiveConsumerConfig(); + + if (Mode == EndpointMode.Durable) + { + config.EnableAutoCommit = false; + config.EnableAutoOffsetStore = false; + } + var listener = new KafkaListener(this, config, Parent.CreateConsumer(config), receiver, runtime.LoggerFactory.CreateLogger()); return ValueTask.FromResult((IListener)listener); diff --git a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs index 076a55b6b..fd4dafdb9 100644 --- a/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs +++ b/src/Transports/Redis/Wolverine.Redis/Internal/RedisStreamListener.cs @@ -24,9 +24,6 @@ public class RedisStreamListener : IListener, ISupportDeadLetterQueue private Task? _scheduledTask; private ListeningStatus _status = ListeningStatus.Stopped; private string _consumerName; - - - public RedisStreamListener(RedisTransport transport, RedisStreamEndpoint endpoint, IWolverineRuntime runtime, IReceiver receiver) { diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 34694507d..07a23641d 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -29,15 +29,18 @@ public class DurableReceiver : ILocalQueue, IChannelCallback, ISupportNativeSche private readonly RetryBlock _moveToErrors; private readonly IBlock _receiver; private readonly RetryBlock _receivingOne; + private readonly IWolverineRuntime _runtime; private readonly RetryBlock _scheduleExecution; private readonly DurabilitySettings _settings; // These members are for draining private bool _latched; + private int _inboxUnavailableSignaled; public DurableReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPipeline pipeline) { _endpoint = endpoint; + _runtime = runtime; _settings = runtime.DurabilitySettings; // the check for Stores being null is honestly just because of some tests that use a little too much mocking @@ -321,6 +324,31 @@ public Task MoveToScheduledUntilAsync(Envelope envelope, DateTimeOffset time) return _scheduleExecution.PostAsync(envelope); } + internal void SignalInboxUnavailable() + { + if (Interlocked.CompareExchange(ref _inboxUnavailableSignaled, 1, 0) != 0) return; + + _logger.LogWarning("Inbox database unavailable for {Uri}. Signaling listener to pause.", Uri); + + // Fire-and-forget via Task.Run to avoid deadlock: + // We're on a RetryBlock thread; PauseForInboxRecoveryAsync drains that same RetryBlock. + _ = Task.Run(async () => + { + try + { + var agent = _runtime.Endpoints.FindListeningAgent(Uri); + if (agent is ListeningAgent la) + { + await la.PauseForInboxRecoveryAsync(); + } + } + catch (Exception e) + { + _logger.LogError(e, "Error signaling listener pause for inbox recovery at {Uri}", Uri); + } + }); + } + private async Task receiveOneAsync(Envelope envelope) { if (_latched) @@ -390,6 +418,11 @@ await executeWithRetriesAsync(async () => return; } + catch (Exception) + { + SignalInboxUnavailable(); + throw; + } } if (envelope.Status == EnvelopeStatus.Incoming) @@ -468,6 +501,7 @@ public async ValueTask ProcessReceivedMessagesAsync(DateTimeOffset now, IListene catch (Exception e) { _logger.LogError(e, "Error trying to persist incoming envelopes at {Uri}", Uri); + SignalInboxUnavailable(); // Use finer grained retries on one envelope at a time, and this will also deal with // duplicate detection diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index cd479a6ce..d824ca1c4 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -36,7 +36,7 @@ public class ListeningAgent : IAsyncDisposable, IDisposable, IListeningAgent private readonly HandlerPipeline _pipeline; private readonly IWolverineRuntime _runtime; private IReceiver? _receiver; - private Restarter? _restarter; + private IDisposable? _restarter; public ListeningAgent(Endpoint endpoint, WolverineRuntime runtime) { @@ -229,6 +229,33 @@ public async ValueTask PauseAsync(TimeSpan pauseTime) private readonly SemaphoreSlim _semaphore = new(1, 1); + public async ValueTask PauseForInboxRecoveryAsync() + { + if (Status != ListeningStatus.Accepting || Listener == null) return; + + await _semaphore.WaitAsync(); + if (Status != ListeningStatus.Accepting || Listener == null) + { + _semaphore.Release(); + return; + } + + try + { + await StopAndDrainAsync(); + _circuitBreaker?.Reset(); + _logger.LogWarning("Paused listener at {Uri} — inbox database unavailable", Uri); + _runtime.Tracker.Publish(new ListenerState(Uri, Endpoint.EndpointName, ListeningStatus.Stopped)); + + _restarter?.SafeDispose(); + _restarter = new InboxHealthRestarter(this, _runtime, _logger); + } + finally + { + _semaphore.Release(); + } + } + public async ValueTask MarkAsTooBusyAndStopReceivingAsync() { if (Status != ListeningStatus.Accepting || Listener == null) @@ -320,6 +347,51 @@ public void Dispose() } } +internal class InboxHealthRestarter : IDisposable +{ + private readonly CancellationTokenSource _cancellation = new(); + private readonly Task _task; + + public InboxHealthRestarter(IListenerCircuit parent, IWolverineRuntime runtime, ILogger logger) + { + _task = Task.Run(() => ProbeLoopAsync(parent, runtime, logger, _cancellation.Token)); + } + + private static async Task ProbeLoopAsync( + IListenerCircuit parent, IWolverineRuntime runtime, ILogger logger, CancellationToken ct) + { + var delay = TimeSpan.FromSeconds(2); + var maxDelay = TimeSpan.FromSeconds(30); + + while (!ct.IsCancellationRequested) + { + try { await Task.Delay(delay, ct); } + catch (OperationCanceledException) { return; } + + try + { + // Lightweight probe — releases 0 rows but exercises DB connection + await runtime.Storage.Inbox.ReleaseIncomingAsync(0, new Uri("wolverine://inbox-health-probe")); + + logger.LogInformation("Inbox available again for {Uri}. Restarting listener.", parent.Endpoint.Uri); + await parent.StartAsync(); + return; + } + catch (Exception e) + { + logger.LogWarning(e, "Inbox still unavailable for {Uri}. Retrying in {Delay}.", parent.Endpoint.Uri, delay); + delay = TimeSpan.FromMilliseconds(Math.Min(delay.TotalMilliseconds * 1.5, maxDelay.TotalMilliseconds)); + } + } + } + + public void Dispose() + { + _cancellation.Cancel(); + _task.SafeDispose(); + } +} + internal class RetryOnInlineChannelCallback : IListener { private readonly IListener _inner;