Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private async Task recoverMessagesForListener(Uri listener, IListenerCircuit cir
var envelopes = await _parent.LoadPageOfGloballyOwnedIncomingAsync(listener, _settings.RecoveryBatchSize);
await _parent.ReassignIncomingAsync(_settings.AssignedNodeNumber, envelopes);

circuit.EnqueueDirectly(envelopes);
await circuit.EnqueueDirectlyAsync(envelopes);
_logger.RecoveredIncoming(envelopes);

_logger.LogInformation("Successfully recovered {Count} messages from the inbox for listener {Listener}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
using System.Linq;
using System.Threading.Tasks;
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using JasperFx.Resources;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Configuration;
using Wolverine.Persistence.Durability;
using Wolverine.Persistence.Durability.DeadLetterManagement;
using Wolverine.RabbitMQ;
Expand All @@ -18,15 +21,16 @@

namespace Wolverine.RabbitMQ.Tests.Bugs;

public class Bug_ReplayDeadLetterQueue
public class Bug_1594_ReplayDeadLetterQueue
{
private readonly ITestOutputHelper _output;
public Bug_ReplayDeadLetterQueue(ITestOutputHelper output) { _output = output; ReplayTestHandler.Output = output; }
public Bug_1594_ReplayDeadLetterQueue(ITestOutputHelper output) { _output = output; ReplayTestHandler.Output = output; }

[Theory]
[InlineData(false)] // non-durable
[InlineData(true)] // durable
public async Task can_replay_dead_letter_message(bool useDurableInbox)
[InlineData(EndpointMode.Inline)]
[InlineData(EndpointMode.Durable)]
[InlineData(EndpointMode.BufferedInMemory)]
public async Task can_replay_dead_letter_message(EndpointMode mode)
{
var queueName = $"replay-dlq-{Guid.NewGuid()}";
var connectionString = Servers.SqlServerConnectionString;
Expand All @@ -42,27 +46,18 @@ public async Task can_replay_dead_letter_message(bool useDurableInbox)
opts.Policies.AutoApplyTransactions();
opts.EnableAutomaticFailureAcks = false;
opts.Durability.Mode = DurabilityMode.Solo;

opts.UseRabbitMq().DisableDeadLetterQueueing().AutoProvision().AutoPurgeOnStartup();
if (useDurableInbox)
{
opts.ListenToRabbitQueue(queueName).UseDurableInbox();
opts.PublishMessage<ReplayTestMessage>().ToRabbitQueue(queueName);
}
else
{
opts.ListenToRabbitQueue(queueName);
opts.PublishMessage<ReplayTestMessage>().ToRabbitQueue(queueName);
}
opts.PublishMessage<ReplayTestMessage>().ToRabbitQueue(queueName);
opts.ListenToRabbitQueue(queueName, q => q.As<Endpoint>().Mode = mode);

opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);
}).StartAsync();

await host.ResetResourceState();

using (var scope = host.Services.CreateScope())
{
var bus = scope.ServiceProvider.GetRequiredService<IMessageBus>();
await bus.PublishAsync(new ReplayTestMessage());
}

await host.MessageBus().PublishAsync(new ReplayTestMessage());

await Task.Delay(1000);

var messageStore = host.Services.GetRequiredService<IMessageStore>();
Expand All @@ -79,6 +74,7 @@ public async Task can_replay_dead_letter_message(bool useDurableInbox)
}
await Task.Delay(100);
}

deadLetterId.ShouldNotBeNull("Message should be in DLQ after failure");

// Log state before replay
Expand All @@ -92,7 +88,7 @@ public async Task can_replay_dead_letter_message(bool useDurableInbox)
var tracked = await host
.TrackActivity()
.DoNotAssertOnExceptionsDetected()
.Timeout(TimeSpan.FromSeconds(10))
.Timeout(60.Seconds())
.WaitForMessageToBeReceivedAt<ReplayTestMessage>(host)
.ExecuteAndWaitAsync((IMessageContext _) => messageStore.DeadLetters.MarkDeadLetterEnvelopesAsReplayableAsync(new[] { deadLetterId.Value }));

Expand All @@ -109,7 +105,7 @@ public async Task can_replay_dead_letter_message(bool useDurableInbox)
tracked.MessageSucceeded.SingleMessage<ReplayTestMessage>()
.ShouldNotBeNull("ReplayTestMessage should be successfully processed after replay");
afterReplay.DeadLetterEnvelopes.Any(dl => dl.Id == deadLetterId).ShouldBeFalse("Message should be removed from DLQ after successful replay (this should work for both durable and non-durable queues)");
afterIncoming.Any(env => env.Id == deadLetterId).ShouldBeFalse("Message should not remain in Incoming after successful processing");
afterIncoming.Any(env => env.Status == EnvelopeStatus.Incoming && env.Id == deadLetterId).ShouldBeFalse("Message should not remain in Incoming after successful processing");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ internal class RabbitMqListener : RabbitMqChannelAgent, IListener, ISupportDeadL
private readonly RabbitMqTransport _transport;
private readonly IReceiver _receiver;
private readonly Lazy<ISender> _sender;
private string? _consumerId;

public RabbitMqListener(IWolverineRuntime runtime,
RabbitMqQueue queue, RabbitMqTransport transport, IReceiver receiver) : base(transport.UseSenderConnectionOnly ? transport.SendingConnection : transport.ListeningConnection,
runtime.LoggerFactory.CreateLogger<RabbitMqListener>())
{
Queue = queue;
Address = queue.Uri;
ConsumerAddress = Address;

_sender = new Lazy<ISender>(() => Queue.ResolveSender(runtime));
_cancellation.Register(() =>
Expand Down Expand Up @@ -215,7 +217,24 @@ public async Task CompleteAsync(ulong deliveryTag)
await Channel!.BasicAckAsync(deliveryTag, true, _cancellation);
}

public string? ConsumerId { get; set; }
public string? ConsumerId
{
get => _consumerId;
set
{
_consumerId = value;

if (value == null)
{
ConsumerAddress = Address;
}
else
{
ConsumerAddress = new Uri($"{Address}?consumer={_consumerId}");
}
}
}

public Uri BaseAddress => Queue.Uri;
public Uri ConsumerAddress => new Uri($"{Queue.Uri}?consumer={ConsumerId}");
public Uri ConsumerAddress { get; private set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public async Task<AgentCommands> ExecuteAsync(IWolverineRuntime runtime,
var envelopes = await _store.LoadPageOfGloballyOwnedIncomingAsync(_count.Destination, pageSize);
await _store.ReassignIncomingAsync(_settings.AssignedNodeNumber, envelopes);

_circuit.EnqueueDirectly(envelopes);
await _circuit.EnqueueDirectlyAsync(envelopes);
_logger.RecoveredIncoming(envelopes);

_logger.LogInformation("Successfully recovered {Count} messages from the inbox for listener {Listener}",
Expand Down
59 changes: 56 additions & 3 deletions src/Wolverine/Transports/ListeningAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface IListenerCircuit
ValueTask PauseAsync(TimeSpan pauseTime);
ValueTask StartAsync();

void EnqueueDirectly(IEnumerable<Envelope> envelopes);
Task EnqueueDirectlyAsync(IEnumerable<Envelope> envelopes);
}

public interface IListeningAgent : IListenerCircuit
Expand Down Expand Up @@ -99,9 +99,14 @@ public void Dispose()

public int QueueCount => _receiver is ILocalQueue q ? q.QueueCount : 0;

public void EnqueueDirectly(IEnumerable<Envelope> envelopes)
public async Task EnqueueDirectlyAsync(IEnumerable<Envelope> envelopes)
{
if (_receiver is ILocalQueue queue)
if (_receiver is BufferedReceiver)
{
// Agent is latched if listener is null
await _receiver.ReceivedAsync(new RetryOnInlineChannelCallback(Listener, _runtime), envelopes.ToArray());
}
else if (_receiver is ILocalQueue queue)
{
var uniqueNodeId = _runtime.DurabilitySettings.AssignedNodeNumber;
foreach (var envelope in envelopes)
Expand All @@ -110,6 +115,11 @@ public void EnqueueDirectly(IEnumerable<Envelope> envelopes)
queue.Enqueue(envelope);
}
}
else if (_receiver is InlineReceiver inline)
{
// Agent is latched if listener is null
await inline.ReceivedAsync(new RetryOnInlineChannelCallback(Listener, _runtime), envelopes.ToArray());
}
else
{
throw new InvalidOperationException("There is no active, local queue for this listening endpoint at " +
Expand Down Expand Up @@ -290,4 +300,47 @@ public void Dispose()
_cancellation.Cancel();
_task.SafeDispose();
}
}

internal class RetryOnInlineChannelCallback : IListener
{
private readonly IListener _inner;
private readonly IWolverineRuntime _runtime;

public RetryOnInlineChannelCallback(IListener inner, IWolverineRuntime runtime)
{
_inner = inner;
_runtime = runtime;
}

public IHandlerPipeline? Pipeline => _inner.Pipeline;
public async ValueTask CompleteAsync(Envelope envelope)
{
try
{
await _runtime.Storage.Inbox.MarkIncomingEnvelopeAsHandledAsync(envelope);
}
catch (Exception e)
{
_runtime.Logger.LogError(e, "Error trying to mark a message as handled in the transactional inbox");
}

await _inner.CompleteAsync(envelope);
}

public ValueTask DeferAsync(Envelope envelope)
{
return _inner.DeferAsync(envelope);
}

public ValueTask DisposeAsync()
{
return new ValueTask();
}

public Uri Address => _inner.Address;
public ValueTask StopAsync()
{
return new ValueTask();
}
}
4 changes: 3 additions & 1 deletion src/Wolverine/Transports/Local/BufferedLocalQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ ValueTask IListenerCircuit.StartAsync()
return ValueTask.CompletedTask;
}

void IListenerCircuit.EnqueueDirectly(IEnumerable<Envelope> envelopes)
Task IListenerCircuit.EnqueueDirectlyAsync(IEnumerable<Envelope> envelopes)
{
foreach (var envelope in envelopes)
{
EnqueueDirectly(envelope);
}

return Task.CompletedTask;
}

public Uri Destination { get; }
Expand Down
6 changes: 4 additions & 2 deletions src/Wolverine/Transports/Local/DurableLocalQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,16 @@ public DurableLocalQueue(Endpoint endpoint, WolverineRuntime runtime)

int IListenerCircuit.QueueCount => _receiver?.QueueCount ?? 0;

void IListenerCircuit.EnqueueDirectly(IEnumerable<Envelope> envelopes)
Task IListenerCircuit.EnqueueDirectlyAsync(IEnumerable<Envelope> envelopes)
{
if (_receiver == null)
{
return;
return Task.CompletedTask;
}

foreach (var envelope in envelopes) _receiver.Enqueue(envelope);

return Task.CompletedTask;
}

public async ValueTask PauseAsync(TimeSpan pauseTime)
Expand Down
Loading