Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 7 additions & 31 deletions src/Transports/Kafka/Wolverine.Kafka/Internals/KafkaListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ public class KafkaListener : IListener, IDisposable, ISupportDeadLetterQueue
private readonly Task _runner;
private readonly IReceiver _receiver;
private readonly string? _messageTypeName;
private readonly QualityOfService _qualityOfService;
private readonly ILogger _logger;

public KafkaListener(KafkaTopic topic, ConsumerConfig config,
IConsumer<string, byte[]> consumer, IReceiver receiver,
ILogger<KafkaListener> logger)
Expand All @@ -34,32 +32,13 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config,
Config = config;
_receiver = receiver;

_qualityOfService = Config.EnableAutoCommit.HasValue && !Config.EnableAutoCommit.Value
? QualityOfService.AtMostOnce
: QualityOfService.AtLeastOnce;

_runner = Task.Run(async () =>
{
_consumer.Subscribe(topic.TopicName);
try
{
while (!_cancellation.IsCancellationRequested)
{
if (_qualityOfService == QualityOfService.AtMostOnce)
{
try
{
_consumer.Commit();
}
catch (KafkaException e)
{
if (!e.Message.Contains("No offset stored"))
{
throw;
}
}
}

try
{
var result = _consumer.Consume(_cancellation.Token);
Expand Down Expand Up @@ -87,7 +66,7 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config,
catch (Exception)
{
}

logger.LogError(e, "Error trying to map Kafka message to a Wolverine envelope");
}
}
Expand All @@ -109,16 +88,13 @@ public KafkaListener(KafkaTopic topic, ConsumerConfig config,

public ValueTask CompleteAsync(Envelope envelope)
{
if (_qualityOfService == QualityOfService.AtLeastOnce)
try
{
_consumer.Commit();
}
catch (Exception)
{
try
{
_consumer.Commit();
}
catch (Exception)
{

}
}
return ValueTask.CompletedTask;
}
Expand Down Expand Up @@ -192,4 +168,4 @@ public void Dispose()
_consumer.SafeDispose();
_runner.Dispose();
}
}
}
7 changes: 7 additions & 0 deletions src/Transports/Kafka/Wolverine.Kafka/KafkaTopic.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ public override ValueTask<IListener> BuildListenerAsync(IWolverineRuntime runtim
EnvelopeMapper ??= BuildMapper(runtime);

var config = GetEffectiveConsumerConfig();

if (Mode == EndpointMode.Durable)
{
config.EnableAutoCommit = false;
config.EnableAutoOffsetStore = false;
}

var listener = new KafkaListener(this, config,
Parent.CreateConsumer(config), receiver, runtime.LoggerFactory.CreateLogger<KafkaListener>());
return ValueTask.FromResult((IListener)listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ public class RedisStreamListener : IListener, ISupportDeadLetterQueue
private Task? _scheduledTask;
private ListeningStatus _status = ListeningStatus.Stopped;
private string _consumerName;



public RedisStreamListener(RedisTransport transport, RedisStreamEndpoint endpoint,
IWolverineRuntime runtime, IReceiver receiver)
{
Expand Down
34 changes: 34 additions & 0 deletions src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
private readonly RetryBlock<Envelope> _moveToErrors;
private readonly IBlock<Envelope> _receiver;
private readonly RetryBlock<Envelope> _receivingOne;
private readonly IWolverineRuntime _runtime;
private readonly RetryBlock<Envelope> _scheduleExecution;
private readonly DurabilitySettings _settings;

// These members are for draining
private bool _latched;
private int _inboxUnavailableSignaled;

public DurableReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPipeline pipeline)
{
_endpoint = endpoint;
_runtime = runtime;
_settings = runtime.DurabilitySettings;

// the check for Stores being null is honestly just because of some tests that use a little too much mocking
Expand Down Expand Up @@ -206,7 +209,7 @@
await EnqueueAsync(envelope);
}

public IHandlerPipeline? Pipeline { get; }

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

Check warning on line 212 in src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

View workflow job for this annotation

GitHub Actions / test

Nullability of reference types in return type of 'IHandlerPipeline? DurableReceiver.Pipeline.get' doesn't match implicitly implemented member 'IHandlerPipeline IReceiver.Pipeline.get' (possibly because of nullability attributes).

public Uri Uri { get; set; }

Expand Down Expand Up @@ -321,6 +324,31 @@
return _scheduleExecution.PostAsync(envelope);
}

internal void SignalInboxUnavailable()
{
if (Interlocked.CompareExchange(ref _inboxUnavailableSignaled, 1, 0) != 0) return;

_logger.LogWarning("Inbox database unavailable for {Uri}. Signaling listener to pause.", Uri);

// Fire-and-forget via Task.Run to avoid deadlock:
// We're on a RetryBlock thread; PauseForInboxRecoveryAsync drains that same RetryBlock.
_ = Task.Run(async () =>
{
try
{
var agent = _runtime.Endpoints.FindListeningAgent(Uri);
if (agent is ListeningAgent la)
{
await la.PauseForInboxRecoveryAsync();
}
}
catch (Exception e)
{
_logger.LogError(e, "Error signaling listener pause for inbox recovery at {Uri}", Uri);
}
});
}

private async Task receiveOneAsync(Envelope envelope)
{
if (_latched)
Expand Down Expand Up @@ -390,6 +418,11 @@

return;
}
catch (Exception)
{
SignalInboxUnavailable();
throw;
}
}

if (envelope.Status == EnvelopeStatus.Incoming)
Expand Down Expand Up @@ -468,6 +501,7 @@
catch (Exception e)
{
_logger.LogError(e, "Error trying to persist incoming envelopes at {Uri}", Uri);
SignalInboxUnavailable();

// Use finer grained retries on one envelope at a time, and this will also deal with
// duplicate detection
Expand Down
74 changes: 73 additions & 1 deletion src/Wolverine/Transports/ListeningAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ListeningAgent : IAsyncDisposable, IDisposable, IListeningAgent
private readonly HandlerPipeline _pipeline;
private readonly IWolverineRuntime _runtime;
private IReceiver? _receiver;
private Restarter? _restarter;
private IDisposable? _restarter;

public ListeningAgent(Endpoint endpoint, WolverineRuntime runtime)
{
Expand Down Expand Up @@ -229,6 +229,33 @@ public async ValueTask PauseAsync(TimeSpan pauseTime)

private readonly SemaphoreSlim _semaphore = new(1, 1);

public async ValueTask PauseForInboxRecoveryAsync()
{
if (Status != ListeningStatus.Accepting || Listener == null) return;

await _semaphore.WaitAsync();
if (Status != ListeningStatus.Accepting || Listener == null)
{
_semaphore.Release();
return;
}

try
{
await StopAndDrainAsync();
_circuitBreaker?.Reset();
_logger.LogWarning("Paused listener at {Uri} — inbox database unavailable", Uri);
_runtime.Tracker.Publish(new ListenerState(Uri, Endpoint.EndpointName, ListeningStatus.Stopped));

_restarter?.SafeDispose();
_restarter = new InboxHealthRestarter(this, _runtime, _logger);
}
finally
{
_semaphore.Release();
}
}

public async ValueTask MarkAsTooBusyAndStopReceivingAsync()
{
if (Status != ListeningStatus.Accepting || Listener == null)
Expand Down Expand Up @@ -320,6 +347,51 @@ public void Dispose()
}
}

internal class InboxHealthRestarter : IDisposable
{
private readonly CancellationTokenSource _cancellation = new();
private readonly Task _task;

public InboxHealthRestarter(IListenerCircuit parent, IWolverineRuntime runtime, ILogger logger)
{
_task = Task.Run(() => ProbeLoopAsync(parent, runtime, logger, _cancellation.Token));
}

private static async Task ProbeLoopAsync(
IListenerCircuit parent, IWolverineRuntime runtime, ILogger logger, CancellationToken ct)
{
var delay = TimeSpan.FromSeconds(2);
var maxDelay = TimeSpan.FromSeconds(30);

while (!ct.IsCancellationRequested)
{
try { await Task.Delay(delay, ct); }
catch (OperationCanceledException) { return; }

try
{
// Lightweight probe — releases 0 rows but exercises DB connection
await runtime.Storage.Inbox.ReleaseIncomingAsync(0, new Uri("wolverine://inbox-health-probe"));

logger.LogInformation("Inbox available again for {Uri}. Restarting listener.", parent.Endpoint.Uri);
await parent.StartAsync();
return;
}
catch (Exception e)
{
logger.LogWarning(e, "Inbox still unavailable for {Uri}. Retrying in {Delay}.", parent.Endpoint.Uri, delay);
delay = TimeSpan.FromMilliseconds(Math.Min(delay.TotalMilliseconds * 1.5, maxDelay.TotalMilliseconds));
}
}
}

public void Dispose()
{
_cancellation.Cancel();
_task.SafeDispose();
}
}

internal class RetryOnInlineChannelCallback : IListener
{
private readonly IListener _inner;
Expand Down
Loading