diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/native_dead_letter_queue_mechanics.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/native_dead_letter_queue_mechanics.cs index ad83d6c48..e33115542 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/native_dead_letter_queue_mechanics.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/native_dead_letter_queue_mechanics.cs @@ -2,6 +2,9 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using JasperFx.Resources; +using RabbitMQ.Client; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; using Shouldly; using Wolverine.RabbitMQ.Internal; using Wolverine.Runtime; @@ -146,6 +149,120 @@ public async Task move_failed_messages_to_the_dlq() throw new Exception("Never got a message in the dead letter queue"); } + [Fact] + public async Task uses_overridden_dead_letter_exchange_per_queue_when_transport_has_custom_default() + { + var queueName = "queue-alpha"; + var defaultDeadLetterQueueName = "default-dlq"; + var defaultDeadLetterExchangeName = "dlx-exchange"; + var overriddenDeadLetterQueueName = "queue-alpha-dlx"; + + var options = new WolverineOptions(); + options.UseRabbitMq() + .CustomizeDeadLetterQueueing(new DeadLetterQueue(defaultDeadLetterQueueName) + { + ExchangeName = defaultDeadLetterExchangeName + }); + + options.ListenToRabbitQueue(queueName, q => q.QueueType = QueueType.quorum) + .DeadLetterQueueing(new DeadLetterQueue(overriddenDeadLetterQueueName)); + + var runtime = Substitute.For(); + runtime.Options.Returns(options); + + var transport = options.RabbitMqTransport(); + var queue = transport.Queues[transport.MaybeCorrectName(queueName)]; + + queue.Compile(runtime); + + var channel = Substitute.For(); + channel.QueueDeclareAsync(Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any(), + Arg.Any>()) + .Returns(Task.FromResult(new QueueDeclareOk(queue.QueueName, 0, 0))); + + await queue.DeclareAsync(channel, NullLogger.Instance); + + queue.Arguments[RabbitMqTransport.DeadLetterQueueHeader] + .ShouldBe(overriddenDeadLetterQueueName); + } + + [Fact] + public void keeps_per_queue_dead_letter_exchange_when_transport_has_custom_default() + { + var queueName = "queue-beta"; + var defaultDeadLetterQueueName = "default-dlq"; + var defaultDeadLetterExchangeName = "dlx-exchange"; + var overriddenDeadLetterQueueName = "queue-beta-dlx"; + var overriddenDeadLetterExchangeName = "queue-beta-dlx-exchange"; + + var options = new WolverineOptions(); + var transportExpression = options.UseRabbitMq() + .CustomizeDeadLetterQueueing(new DeadLetterQueue(defaultDeadLetterQueueName) + { + ExchangeName = defaultDeadLetterExchangeName + }); + var transport = transportExpression.Transport; + + options.ListenToRabbitQueue(queueName) + .DeadLetterQueueing(new DeadLetterQueue(overriddenDeadLetterQueueName) + { + ExchangeName = overriddenDeadLetterExchangeName + }); + + var queue = transport.Queues[transport.MaybeCorrectName(queueName)]; + + var runtime = Substitute.For(); + runtime.Options.Returns(options); + + queue.Compile(runtime); + + queue.DeadLetterQueue!.ExchangeName.ShouldBe(overriddenDeadLetterExchangeName); + transport.DeadLetterQueue.ExchangeName.ShouldBe(defaultDeadLetterExchangeName); + } + + [Fact] + public async Task default_and_override_queues_keep_their_own_dlx_exchange_on_declare() + { + var defaultExchange = "default-dlx-exchange"; + var defaultQueue = "queue-default"; + var overrideQueue = "queue-override"; + var overrideExchange = "override-dlx-exchange"; + + var options = new WolverineOptions(); + var transport = options.UseRabbitMq() + .CustomizeDeadLetterQueueing(new DeadLetterQueue("default-dlq") + { + ExchangeName = defaultExchange + }).Transport; + + options.ListenToRabbitQueue(defaultQueue); + options.ListenToRabbitQueue(overrideQueue) + .DeadLetterQueueing(new DeadLetterQueue(overrideQueue + "-dlq") + { + ExchangeName = overrideExchange + }); + + var runtime = Substitute.For(); + runtime.Options.Returns(options); + + var defaultEndpoint = transport.Queues[transport.MaybeCorrectName(defaultQueue)]; + var overrideEndpoint = transport.Queues[transport.MaybeCorrectName(overrideQueue)]; + + defaultEndpoint.Compile(runtime); + overrideEndpoint.Compile(runtime); + + var channel = Substitute.For(); + channel.QueueDeclareAsync(Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any(), + Arg.Any>()) + .Returns(Task.FromResult(new QueueDeclareOk(defaultQueue, 0, 0))); + + await defaultEndpoint.DeclareAsync(channel, NullLogger.Instance); + await overrideEndpoint.DeclareAsync(channel, NullLogger.Instance); + + defaultEndpoint.Arguments[RabbitMqTransport.DeadLetterQueueHeader].ShouldBe(defaultExchange); + overrideEndpoint.Arguments[RabbitMqTransport.DeadLetterQueueHeader].ShouldBe(overrideExchange); + } + [Fact] public async Task overriding_dead_letter_queue_for_specific_queue() { diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/DeadLetterQueue.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/DeadLetterQueue.cs index 7ee6b4552..4c9051ba5 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/DeadLetterQueue.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/DeadLetterQueue.cs @@ -63,6 +63,17 @@ public string? BindingName public Action? ConfigureExchange { get; set; } + public DeadLetterQueue Clone() + { + return new DeadLetterQueue(QueueName, Mode) + { + ExchangeName = ExchangeName, + BindingName = BindingName, + ConfigureQueue = ConfigureQueue, + ConfigureExchange = ConfigureExchange + }; + } + protected bool Equals(DeadLetterQueue other) { return _queueName == other._queueName && ExchangeName == other.ExchangeName; diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqQueue.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqQueue.cs index 2710f8c44..86a0da04f 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqQueue.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqQueue.cs @@ -43,7 +43,7 @@ internal RabbitMqQueue(string queueName, RabbitMqTransport parent, EndpointRole if (QueueName != _parent.DeadLetterQueue.QueueName) { - DeadLetterQueue = _parent.DeadLetterQueue; + DeadLetterQueue = _parent.DeadLetterQueue.Clone(); } } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqListenerConfiguration.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqListenerConfiguration.cs index 3474f9902..7db59efbc 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqListenerConfiguration.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/RabbitMqListenerConfiguration.cs @@ -179,7 +179,7 @@ public RabbitMqListenerConfiguration DeadLetterQueueing(DeadLetterQueue dlq) { add(e => { - e.DeadLetterQueue = dlq; + e.DeadLetterQueue = dlq.Clone(); }); return this;