diff --git a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Incoming.cs b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Incoming.cs index 88972a3c0..0f53bf58a 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Incoming.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/Durability/RavenDbDurabilityAgent.Incoming.cs @@ -44,7 +44,7 @@ private async Task recoverMessagesForListener(Uri listener, IListenerCircuit cir var envelopes = await _parent.LoadPageOfGloballyOwnedIncomingAsync(listener, _settings.RecoveryBatchSize); await _parent.ReassignIncomingAsync(_settings.AssignedNodeNumber, envelopes); - circuit.EnqueueDirectly(envelopes); + await circuit.EnqueueDirectlyAsync(envelopes); _logger.RecoveredIncoming(envelopes); _logger.LogInformation("Successfully recovered {Count} messages from the inbox for listener {Listener}", diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_ReplayDeadLetterQueue.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_1594_ReplayDeadLetterQueue.cs similarity index 80% rename from src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_ReplayDeadLetterQueue.cs rename to src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_1594_ReplayDeadLetterQueue.cs index bc9731665..1427c8372 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_ReplayDeadLetterQueue.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_1594_ReplayDeadLetterQueue.cs @@ -3,11 +3,14 @@ using System.Linq; using System.Threading.Tasks; using IntegrationTests; +using JasperFx.Core; +using JasperFx.Core.Reflection; using JasperFx.Resources; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Shouldly; using Wolverine; +using Wolverine.Configuration; using Wolverine.Persistence.Durability; using Wolverine.Persistence.Durability.DeadLetterManagement; using Wolverine.RabbitMQ; @@ -18,15 +21,16 @@ namespace Wolverine.RabbitMQ.Tests.Bugs; -public class Bug_ReplayDeadLetterQueue +public class Bug_1594_ReplayDeadLetterQueue { private readonly ITestOutputHelper _output; - public Bug_ReplayDeadLetterQueue(ITestOutputHelper output) { _output = output; ReplayTestHandler.Output = output; } + public Bug_1594_ReplayDeadLetterQueue(ITestOutputHelper output) { _output = output; ReplayTestHandler.Output = output; } [Theory] - [InlineData(false)] // non-durable - [InlineData(true)] // durable - public async Task can_replay_dead_letter_message(bool useDurableInbox) + [InlineData(EndpointMode.Inline)] + [InlineData(EndpointMode.Durable)] + [InlineData(EndpointMode.BufferedInMemory)] + public async Task can_replay_dead_letter_message(EndpointMode mode) { var queueName = $"replay-dlq-{Guid.NewGuid()}"; var connectionString = Servers.SqlServerConnectionString; @@ -42,27 +46,18 @@ public async Task can_replay_dead_letter_message(bool useDurableInbox) opts.Policies.AutoApplyTransactions(); opts.EnableAutomaticFailureAcks = false; opts.Durability.Mode = DurabilityMode.Solo; + opts.UseRabbitMq().DisableDeadLetterQueueing().AutoProvision().AutoPurgeOnStartup(); - if (useDurableInbox) - { - opts.ListenToRabbitQueue(queueName).UseDurableInbox(); - opts.PublishMessage().ToRabbitQueue(queueName); - } - else - { - opts.ListenToRabbitQueue(queueName); - opts.PublishMessage().ToRabbitQueue(queueName); - } + opts.PublishMessage().ToRabbitQueue(queueName); + opts.ListenToRabbitQueue(queueName, q => q.As().Mode = mode); + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); }).StartAsync(); await host.ResetResourceState(); - - using (var scope = host.Services.CreateScope()) - { - var bus = scope.ServiceProvider.GetRequiredService(); - await bus.PublishAsync(new ReplayTestMessage()); - } + + await host.MessageBus().PublishAsync(new ReplayTestMessage()); + await Task.Delay(1000); var messageStore = host.Services.GetRequiredService(); @@ -79,6 +74,7 @@ public async Task can_replay_dead_letter_message(bool useDurableInbox) } await Task.Delay(100); } + deadLetterId.ShouldNotBeNull("Message should be in DLQ after failure"); // Log state before replay @@ -92,7 +88,7 @@ public async Task can_replay_dead_letter_message(bool useDurableInbox) var tracked = await host .TrackActivity() .DoNotAssertOnExceptionsDetected() - .Timeout(TimeSpan.FromSeconds(10)) + .Timeout(60.Seconds()) .WaitForMessageToBeReceivedAt(host) .ExecuteAndWaitAsync((IMessageContext _) => messageStore.DeadLetters.MarkDeadLetterEnvelopesAsReplayableAsync(new[] { deadLetterId.Value })); @@ -109,7 +105,7 @@ public async Task can_replay_dead_letter_message(bool useDurableInbox) tracked.MessageSucceeded.SingleMessage() .ShouldNotBeNull("ReplayTestMessage should be successfully processed after replay"); afterReplay.DeadLetterEnvelopes.Any(dl => dl.Id == deadLetterId).ShouldBeFalse("Message should be removed from DLQ after successful replay (this should work for both durable and non-durable queues)"); - afterIncoming.Any(env => env.Id == deadLetterId).ShouldBeFalse("Message should not remain in Incoming after successful processing"); + afterIncoming.Any(env => env.Status == EnvelopeStatus.Incoming && env.Id == deadLetterId).ShouldBeFalse("Message should not remain in Incoming after successful processing"); } } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs index b40639a49..b30c53ad6 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqListener.cs @@ -55,6 +55,7 @@ internal class RabbitMqListener : RabbitMqChannelAgent, IListener, ISupportDeadL private readonly RabbitMqTransport _transport; private readonly IReceiver _receiver; private readonly Lazy _sender; + private string? _consumerId; public RabbitMqListener(IWolverineRuntime runtime, RabbitMqQueue queue, RabbitMqTransport transport, IReceiver receiver) : base(transport.UseSenderConnectionOnly ? transport.SendingConnection : transport.ListeningConnection, @@ -62,6 +63,7 @@ public RabbitMqListener(IWolverineRuntime runtime, { Queue = queue; Address = queue.Uri; + ConsumerAddress = Address; _sender = new Lazy(() => Queue.ResolveSender(runtime)); _cancellation.Register(() => @@ -215,7 +217,24 @@ public async Task CompleteAsync(ulong deliveryTag) await Channel!.BasicAckAsync(deliveryTag, true, _cancellation); } - public string? ConsumerId { get; set; } + public string? ConsumerId + { + get => _consumerId; + set + { + _consumerId = value; + + if (value == null) + { + ConsumerAddress = Address; + } + else + { + ConsumerAddress = new Uri($"{Address}?consumer={_consumerId}"); + } + } + } + public Uri BaseAddress => Queue.Uri; - public Uri ConsumerAddress => new Uri($"{Queue.Uri}?consumer={ConsumerId}"); + public Uri ConsumerAddress { get; private set; } } diff --git a/src/Wolverine/Persistence/Durability/RecoverIncomingMessagesCommand.cs b/src/Wolverine/Persistence/Durability/RecoverIncomingMessagesCommand.cs index 2b8752eef..12c1b7e7d 100644 --- a/src/Wolverine/Persistence/Durability/RecoverIncomingMessagesCommand.cs +++ b/src/Wolverine/Persistence/Durability/RecoverIncomingMessagesCommand.cs @@ -39,7 +39,7 @@ public async Task ExecuteAsync(IWolverineRuntime runtime, var envelopes = await _store.LoadPageOfGloballyOwnedIncomingAsync(_count.Destination, pageSize); await _store.ReassignIncomingAsync(_settings.AssignedNodeNumber, envelopes); - _circuit.EnqueueDirectly(envelopes); + await _circuit.EnqueueDirectlyAsync(envelopes); _logger.RecoveredIncoming(envelopes); _logger.LogInformation("Successfully recovered {Count} messages from the inbox for listener {Listener}", diff --git a/src/Wolverine/Transports/ListeningAgent.cs b/src/Wolverine/Transports/ListeningAgent.cs index 1ea9a15c7..9db1f824e 100644 --- a/src/Wolverine/Transports/ListeningAgent.cs +++ b/src/Wolverine/Transports/ListeningAgent.cs @@ -16,7 +16,7 @@ public interface IListenerCircuit ValueTask PauseAsync(TimeSpan pauseTime); ValueTask StartAsync(); - void EnqueueDirectly(IEnumerable envelopes); + Task EnqueueDirectlyAsync(IEnumerable envelopes); } public interface IListeningAgent : IListenerCircuit @@ -99,9 +99,14 @@ public void Dispose() public int QueueCount => _receiver is ILocalQueue q ? q.QueueCount : 0; - public void EnqueueDirectly(IEnumerable envelopes) + public async Task EnqueueDirectlyAsync(IEnumerable envelopes) { - if (_receiver is ILocalQueue queue) + if (_receiver is BufferedReceiver) + { + // Agent is latched if listener is null + await _receiver.ReceivedAsync(new RetryOnInlineChannelCallback(Listener, _runtime), envelopes.ToArray()); + } + else if (_receiver is ILocalQueue queue) { var uniqueNodeId = _runtime.DurabilitySettings.AssignedNodeNumber; foreach (var envelope in envelopes) @@ -110,6 +115,11 @@ public void EnqueueDirectly(IEnumerable envelopes) queue.Enqueue(envelope); } } + else if (_receiver is InlineReceiver inline) + { + // Agent is latched if listener is null + await inline.ReceivedAsync(new RetryOnInlineChannelCallback(Listener, _runtime), envelopes.ToArray()); + } else { throw new InvalidOperationException("There is no active, local queue for this listening endpoint at " + @@ -290,4 +300,47 @@ public void Dispose() _cancellation.Cancel(); _task.SafeDispose(); } +} + +internal class RetryOnInlineChannelCallback : IListener +{ + private readonly IListener _inner; + private readonly IWolverineRuntime _runtime; + + public RetryOnInlineChannelCallback(IListener inner, IWolverineRuntime runtime) + { + _inner = inner; + _runtime = runtime; + } + + public IHandlerPipeline? Pipeline => _inner.Pipeline; + public async ValueTask CompleteAsync(Envelope envelope) + { + try + { + await _runtime.Storage.Inbox.MarkIncomingEnvelopeAsHandledAsync(envelope); + } + catch (Exception e) + { + _runtime.Logger.LogError(e, "Error trying to mark a message as handled in the transactional inbox"); + } + + await _inner.CompleteAsync(envelope); + } + + public ValueTask DeferAsync(Envelope envelope) + { + return _inner.DeferAsync(envelope); + } + + public ValueTask DisposeAsync() + { + return new ValueTask(); + } + + public Uri Address => _inner.Address; + public ValueTask StopAsync() + { + return new ValueTask(); + } } \ No newline at end of file diff --git a/src/Wolverine/Transports/Local/BufferedLocalQueue.cs b/src/Wolverine/Transports/Local/BufferedLocalQueue.cs index 94882e7f1..192233eee 100644 --- a/src/Wolverine/Transports/Local/BufferedLocalQueue.cs +++ b/src/Wolverine/Transports/Local/BufferedLocalQueue.cs @@ -31,12 +31,14 @@ ValueTask IListenerCircuit.StartAsync() return ValueTask.CompletedTask; } - void IListenerCircuit.EnqueueDirectly(IEnumerable envelopes) + Task IListenerCircuit.EnqueueDirectlyAsync(IEnumerable envelopes) { foreach (var envelope in envelopes) { EnqueueDirectly(envelope); } + + return Task.CompletedTask; } public Uri Destination { get; } diff --git a/src/Wolverine/Transports/Local/DurableLocalQueue.cs b/src/Wolverine/Transports/Local/DurableLocalQueue.cs index 197cfa091..d85fc25ee 100644 --- a/src/Wolverine/Transports/Local/DurableLocalQueue.cs +++ b/src/Wolverine/Transports/Local/DurableLocalQueue.cs @@ -68,14 +68,16 @@ public DurableLocalQueue(Endpoint endpoint, WolverineRuntime runtime) int IListenerCircuit.QueueCount => _receiver?.QueueCount ?? 0; - void IListenerCircuit.EnqueueDirectly(IEnumerable envelopes) + Task IListenerCircuit.EnqueueDirectlyAsync(IEnumerable envelopes) { if (_receiver == null) { - return; + return Task.CompletedTask; } foreach (var envelope in envelopes) _receiver.Enqueue(envelope); + + return Task.CompletedTask; } public async ValueTask PauseAsync(TimeSpan pauseTime)