Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using JasperFx.Resources;
using RabbitMQ.Client;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using Shouldly;
using Wolverine.RabbitMQ.Internal;
using Wolverine.Runtime;
Expand Down Expand Up @@ -146,6 +149,120 @@ public async Task move_failed_messages_to_the_dlq()
throw new Exception("Never got a message in the dead letter queue");
}

[Fact]
public async Task uses_overridden_dead_letter_exchange_per_queue_when_transport_has_custom_default()
{
var queueName = "queue-alpha";
var defaultDeadLetterQueueName = "default-dlq";
var defaultDeadLetterExchangeName = "dlx-exchange";
var overriddenDeadLetterQueueName = "queue-alpha-dlx";

var options = new WolverineOptions();
options.UseRabbitMq()
.CustomizeDeadLetterQueueing(new DeadLetterQueue(defaultDeadLetterQueueName)
{
ExchangeName = defaultDeadLetterExchangeName
});

options.ListenToRabbitQueue(queueName, q => q.QueueType = QueueType.quorum)
.DeadLetterQueueing(new DeadLetterQueue(overriddenDeadLetterQueueName));

var runtime = Substitute.For<IWolverineRuntime>();
runtime.Options.Returns(options);

var transport = options.RabbitMqTransport();
var queue = transport.Queues[transport.MaybeCorrectName(queueName)];

queue.Compile(runtime);

var channel = Substitute.For<IChannel>();
channel.QueueDeclareAsync(Arg.Any<string>(), Arg.Any<bool>(), Arg.Any<bool>(), Arg.Any<bool>(),
Arg.Any<IDictionary<string, object>>())
.Returns(Task.FromResult(new QueueDeclareOk(queue.QueueName, 0, 0)));

await queue.DeclareAsync(channel, NullLogger.Instance);

queue.Arguments[RabbitMqTransport.DeadLetterQueueHeader]
.ShouldBe(overriddenDeadLetterQueueName);
}

[Fact]
public void keeps_per_queue_dead_letter_exchange_when_transport_has_custom_default()
{
var queueName = "queue-beta";
var defaultDeadLetterQueueName = "default-dlq";
var defaultDeadLetterExchangeName = "dlx-exchange";
var overriddenDeadLetterQueueName = "queue-beta-dlx";
var overriddenDeadLetterExchangeName = "queue-beta-dlx-exchange";

var options = new WolverineOptions();
var transportExpression = options.UseRabbitMq()
.CustomizeDeadLetterQueueing(new DeadLetterQueue(defaultDeadLetterQueueName)
{
ExchangeName = defaultDeadLetterExchangeName
});
var transport = transportExpression.Transport;

options.ListenToRabbitQueue(queueName)
.DeadLetterQueueing(new DeadLetterQueue(overriddenDeadLetterQueueName)
{
ExchangeName = overriddenDeadLetterExchangeName
});

var queue = transport.Queues[transport.MaybeCorrectName(queueName)];

var runtime = Substitute.For<IWolverineRuntime>();
runtime.Options.Returns(options);

queue.Compile(runtime);

queue.DeadLetterQueue!.ExchangeName.ShouldBe(overriddenDeadLetterExchangeName);
transport.DeadLetterQueue.ExchangeName.ShouldBe(defaultDeadLetterExchangeName);
}

[Fact]
public async Task default_and_override_queues_keep_their_own_dlx_exchange_on_declare()
{
var defaultExchange = "default-dlx-exchange";
var defaultQueue = "queue-default";
var overrideQueue = "queue-override";
var overrideExchange = "override-dlx-exchange";

var options = new WolverineOptions();
var transport = options.UseRabbitMq()
.CustomizeDeadLetterQueueing(new DeadLetterQueue("default-dlq")
{
ExchangeName = defaultExchange
}).Transport;

options.ListenToRabbitQueue(defaultQueue);
options.ListenToRabbitQueue(overrideQueue)
.DeadLetterQueueing(new DeadLetterQueue(overrideQueue + "-dlq")
{
ExchangeName = overrideExchange
});

var runtime = Substitute.For<IWolverineRuntime>();
runtime.Options.Returns(options);

var defaultEndpoint = transport.Queues[transport.MaybeCorrectName(defaultQueue)];
var overrideEndpoint = transport.Queues[transport.MaybeCorrectName(overrideQueue)];

defaultEndpoint.Compile(runtime);
overrideEndpoint.Compile(runtime);

var channel = Substitute.For<IChannel>();
channel.QueueDeclareAsync(Arg.Any<string>(), Arg.Any<bool>(), Arg.Any<bool>(), Arg.Any<bool>(),
Arg.Any<IDictionary<string, object>>())
.Returns(Task.FromResult(new QueueDeclareOk(defaultQueue, 0, 0)));

await defaultEndpoint.DeclareAsync(channel, NullLogger.Instance);
await overrideEndpoint.DeclareAsync(channel, NullLogger.Instance);

defaultEndpoint.Arguments[RabbitMqTransport.DeadLetterQueueHeader].ShouldBe(defaultExchange);
overrideEndpoint.Arguments[RabbitMqTransport.DeadLetterQueueHeader].ShouldBe(overrideExchange);
}

[Fact]
public async Task overriding_dead_letter_queue_for_specific_queue()
{
Expand Down
11 changes: 11 additions & 0 deletions src/Transports/RabbitMQ/Wolverine.RabbitMQ/DeadLetterQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,17 @@ public string? BindingName

public Action<RabbitMqExchange>? ConfigureExchange { get; set; }

public DeadLetterQueue Clone()
{
return new DeadLetterQueue(QueueName, Mode)
{
ExchangeName = ExchangeName,
BindingName = BindingName,
ConfigureQueue = ConfigureQueue,
ConfigureExchange = ConfigureExchange
};
}

protected bool Equals(DeadLetterQueue other)
{
return _queueName == other._queueName && ExchangeName == other.ExchangeName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal RabbitMqQueue(string queueName, RabbitMqTransport parent, EndpointRole

if (QueueName != _parent.DeadLetterQueue.QueueName)
{
DeadLetterQueue = _parent.DeadLetterQueue;
DeadLetterQueue = _parent.DeadLetterQueue.Clone();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public RabbitMqListenerConfiguration DeadLetterQueueing(DeadLetterQueue dlq)
{
add(e =>
{
e.DeadLetterQueue = dlq;
e.DeadLetterQueue = dlq.Clone();
});

return this;
Expand Down
Loading