diff --git a/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs b/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs index 71a9f28a1..1e29b0748 100644 --- a/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs +++ b/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs @@ -54,6 +54,7 @@ public MockWolverineRuntime() { Tracker.Subscribe(this); MetricsAccumulator = new MetricsAccumulator(this); + Options.ServiceName = "Mock"; } public MetricsAccumulator MetricsAccumulator { get; } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_1801_not_acking_on_consumer_failure.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_1801_not_acking_on_consumer_failure.cs new file mode 100644 index 000000000..4ddaec763 --- /dev/null +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Bugs/Bug_1801_not_acking_on_consumer_failure.cs @@ -0,0 +1,38 @@ +using JasperFx.Core; +using Microsoft.Extensions.Hosting; +using Wolverine.Tracking; +using Xunit; + +namespace Wolverine.RabbitMQ.Tests.Bugs; + +public class Bug_1801_not_acking_on_consumer_failure +{ + [Fact] + public async Task try_it() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseRabbitMq().AutoProvision(); + + opts.ListenToRabbitQueue("will_error"); + opts.PublishMessage().ToRabbitQueue("will_error"); + }).StartAsync(); + + var tracked = await host.TrackActivity() + .IncludeExternalTransports() + .DoNotAssertOnExceptionsDetected() + .Timeout(1.Minutes()) + .SendMessageAndWaitAsync(new CauseError("Bang!")); + } +} + +public record CauseError(string Message); + +public static class CauseErrorHandler +{ + public static void Handle(CauseError msg) + { + throw new InvalidOperationException(msg.Message); + } +} \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Wolverine.RabbitMQ.Tests.csproj b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Wolverine.RabbitMQ.Tests.csproj index d9b4f22f4..92861d42e 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Wolverine.RabbitMQ.Tests.csproj +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/Wolverine.RabbitMQ.Tests.csproj @@ -17,6 +17,7 @@ + diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/channel_configuration.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/channel_configuration.cs index 2874ec4da..05b9af0c0 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/channel_configuration.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/channel_configuration.cs @@ -23,7 +23,7 @@ public void can_customize_channel_creation() wolverineOptions.PublisherConfirmationsEnabled.ShouldBe(true); wolverineOptions.PublisherConfirmationTrackingEnabled.ShouldBe(true); - wolverineOptions.ConsumerDispatchConcurrency.ShouldBeEquivalentTo(5); + wolverineOptions.ConsumerDispatchConcurrency.ShouldBeEquivalentTo((ushort)5); } [Fact] @@ -45,6 +45,6 @@ public void can_customize_channel_creation_additively() wolverineOptions.PublisherConfirmationsEnabled.ShouldBe(true); wolverineOptions.PublisherConfirmationTrackingEnabled.ShouldBe(false); - wolverineOptions.ConsumerDispatchConcurrency.ShouldBeEquivalentTo(2); + wolverineOptions.ConsumerDispatchConcurrency.ShouldBeEquivalentTo((ushort)2); } } \ No newline at end of file diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs index 6f0484d08..f00e28259 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/end_to_end.cs @@ -69,7 +69,7 @@ public async Task rabbitmq_transport_is_exposed_as_a_resource() opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); }); - var sources = publisher.Services.GetServices(); + var sources = publisher.Services.GetServices().OfType(); foreach (var source in sources) { var resources = await source.FindResources(); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exchange_queue_binding_model_setup_and_teardown_smoke_tests.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exchange_queue_binding_model_setup_and_teardown_smoke_tests.cs index 5ccb2524d..5c59e58c8 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exchange_queue_binding_model_setup_and_teardown_smoke_tests.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exchange_queue_binding_model_setup_and_teardown_smoke_tests.cs @@ -1,3 +1,4 @@ +using CoreTests.Runtime; using JasperFx.Core; using Microsoft.Extensions.Logging.Abstractions; using NSubstitute; @@ -38,9 +39,7 @@ public exchange_queue_binding_model_setup_and_teardown_smoke_tests() .BindExchange("fan1") .ToQueue("xqueue2", "key2"); - var wolverineRuntime = Substitute.For(); - wolverineRuntime.Logger.Returns(NullLogger.Instance); - wolverineRuntime.DurabilitySettings.Returns(new DurabilitySettings()); + var wolverineRuntime = new MockWolverineRuntime(); theTransport.TryBuildStatefulResource(wolverineRuntime, out var resource); theResource = resource; diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs index 063149781..6a5b81d2f 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/exclusive_listeners.cs @@ -62,7 +62,7 @@ public async Task exclusive_listeners_are_automatically_started_in_solo_mode() }).StartAsync(); var runtime = host.GetRuntime(); - runtime.Endpoints.ActiveListeners().Select(x => x.Endpoint.EndpointName) + runtime.Endpoints.ActiveListeners().Where(x => x.Uri.Scheme != "stub" ).Select(x => x.Endpoint.EndpointName) .OrderBy(x => x) .ShouldHaveTheSameElementsAs("one", "three", "two"); } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs index a4fcd828a..570ec5ca7 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/leader_pinned_listener.cs @@ -79,7 +79,7 @@ await host.WaitUntilAssignmentsChangeTo(w => w.ExpectRunningAgents(host, 2); }, 30.Seconds()); - var listeners = host.GetRuntime().Endpoints.ActiveListeners().Where(x => x.Endpoint.Role == EndpointRole.Application).Select(x => x.Uri).ToArray(); + var listeners = host.GetRuntime().Endpoints.ActiveListeners().Where(x => x.Uri.Scheme != "stub" && x.Endpoint.Role == EndpointRole.Application).Select(x => x.Uri).ToArray(); listeners.Length.ShouldBe(2); listeners.ShouldContain(new Uri("rabbitmq://queue/admin1")); listeners.ShouldContain(new Uri("rabbitmq://queue/admin2")); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs index 4fd473bb2..dea23b2d4 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ.Tests/multi_tenancy_through_virtual_hosts.cs @@ -45,7 +45,7 @@ public async Task InitializeAsync() opts.ServiceName = "main"; - opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup() + opts.UseRabbitMq().AutoProvision().AutoPurgeOnStartup().DisableDeadLetterQueueing() .AddTenant("one", "vh1") .AddTenant("two", "vh2") .AddTenant("three", "vh3"); @@ -78,7 +78,7 @@ public async Task InitializeAsync() { opts.Policies.DisableConventionalLocalRouting(); opts.ServiceName = "one"; - opts.UseRabbitMq(f => f.VirtualHost = "vh1"); + opts.UseRabbitMq(f => f.VirtualHost = "vh1").DisableDeadLetterQueueing(); opts.ListenToRabbitQueue("multi_incoming"); opts.Services.AddResourceSetupOnStartup(); @@ -91,7 +91,7 @@ public async Task InitializeAsync() { opts.Policies.DisableConventionalLocalRouting(); opts.ServiceName = "two"; - opts.UseRabbitMq(f => f.VirtualHost = "vh2"); + opts.UseRabbitMq(f => f.VirtualHost = "vh2").DisableDeadLetterQueueing(); opts.ListenToRabbitQueue("multi_incoming"); opts.Services.AddResourceSetupOnStartup(); @@ -102,7 +102,7 @@ public async Task InitializeAsync() { opts.Policies.DisableConventionalLocalRouting(); opts.ServiceName = "three"; - opts.UseRabbitMq(f => f.VirtualHost = "vh3"); + opts.UseRabbitMq(f => f.VirtualHost = "vh3").DisableDeadLetterQueueing(); opts.ListenToRabbitQueue("multi_incoming"); opts.Services.AddResourceSetupOnStartup(); @@ -155,6 +155,7 @@ public async Task send_message_to_a_specific_tenant() var message = new MultiTenantMessage(Guid.NewGuid()); var session = await _fixture.Main .TrackActivity() + .Timeout(15.Seconds()) .AlsoTrack(_fixture.One, _fixture.Two, _fixture.Three) .WaitForMessageToBeReceivedAt(_fixture.Two) .SendMessageAndWaitAsync(message, new DeliveryOptions{TenantId = "two"}); diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqChannelCallback.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqChannelCallback.cs index acaa76fe1..fa120973b 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqChannelCallback.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqChannelCallback.cs @@ -99,6 +99,8 @@ private async Task moveToErrorQueueAsync(RabbitMqEnvelope envelope, Cancellation { if (envelope.RabbitMqListener.Channel is not null) { + // For idempotency + envelope.HasBeenAcked = true; await envelope.RabbitMqListener.Channel.BasicNackAsync(envelope.DeliveryTag, false, false, token); } } diff --git a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs index 7b2f99e6a..47045517e 100644 --- a/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs +++ b/src/Transports/RabbitMQ/Wolverine.RabbitMQ/Internal/RabbitMqExchange.cs @@ -94,7 +94,7 @@ internal override string RoutingKey() internal async Task DeclareAsync(IChannel channel, ILogger logger) { - if (HasDeclared || DeclaredName == string.Empty) + if (DeclaredName == string.Empty) { return; } diff --git a/src/Wolverine/Envelope.Internals.cs b/src/Wolverine/Envelope.Internals.cs index a035fed8e..fe8878161 100644 --- a/src/Wolverine/Envelope.Internals.cs +++ b/src/Wolverine/Envelope.Internals.cs @@ -84,6 +84,8 @@ internal Envelope(object message, IMessageSerializer writer) public bool IsResponse { get; set; } public Exception? Failure { get; set; } internal Envelope[]? Batch { get; set; } + + internal bool HasBeenAcked { get; set; } internal void StartTiming() { diff --git a/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs b/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs index a864de071..e7615ab0f 100644 --- a/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs +++ b/src/Wolverine/ErrorHandling/MoveToErrorQueue.cs @@ -43,6 +43,8 @@ await lifecycle.SendFailureAcknowledgementAsync( } await lifecycle.MoveToDeadLetterQueueAsync(Exception); + + await lifecycle.CompleteAsync(); activity?.AddEvent(new ActivityEvent(WolverineTracing.MovedToErrorQueue)); diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index 3e61d52e8..8bcb016a6 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -184,17 +184,20 @@ public async Task AssertAnyRequiredResponseWasGenerated() } } - public ValueTask CompleteAsync() + public async ValueTask CompleteAsync() { if (_channel == null || Envelope == null) { throw new InvalidOperationException("No Envelope is active for this context"); } - return _channel.CompleteAsync(Envelope); + if (Envelope.HasBeenAcked) return; + + await _channel.CompleteAsync(Envelope); + Envelope.HasBeenAcked = true; } - public ValueTask DeferAsync() + public async ValueTask DeferAsync() { if (_channel == null || Envelope == null) { @@ -202,7 +205,7 @@ public ValueTask DeferAsync() } Runtime.MessageTracking.Requeued(Envelope); - return _channel.DeferAsync(Envelope); + await _channel.DeferAsync(Envelope); } public async Task ReScheduleAsync(DateTimeOffset scheduledTime)