diff --git a/src/Persistence/MartenTests/Bugs/Bug_826_issue_with_paused_listener.cs b/src/Persistence/MartenTests/Bugs/Bug_826_issue_with_paused_listener.cs new file mode 100644 index 000000000..3a14ea694 --- /dev/null +++ b/src/Persistence/MartenTests/Bugs/Bug_826_issue_with_paused_listener.cs @@ -0,0 +1,83 @@ +using IntegrationTests; +using JasperFx.Core; +using Marten; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Npgsql; +using Shouldly; +using Weasel.Postgresql; +using Wolverine; +using Wolverine.ErrorHandling; +using Wolverine.Marten; +using Wolverine.Runtime.Interop.MassTransit; +using Wolverine.Tracking; + +namespace MartenTests.Bugs; + +public class Bug_826_issue_with_paused_listener +{ + private async Task dropSchema() + { + using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await conn.DropSchemaAsync("bug826"); + await conn.CloseAsync(); + } + + [Fact] + public async Task can_resume_listening() + { + await dropSchema(); + + var builder = Host.CreateDefaultBuilder(); + + builder.UseWolverine(options => + { + options.LocalQueueFor().Sequential(); + options.OnException() + .Requeue().AndPauseProcessing(5.Seconds()); + + options.Durability.Mode = DurabilityMode.Solo; + + options.Policies.AutoApplyTransactions(); + options.Policies.UseDurableLocalQueues(); + options.Policies.UseDurableOutboxOnAllSendingEndpoints(); + + options.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "bug826"; + m.DisableNpgsqlLogging = true; + }) + .UseLightweightSessions() + .IntegrateWithWolverine(); + }); + + using var host = await builder.StartAsync(); + + await host.TrackActivity() + .WaitForMessageToBeReceivedAt(host) + .Timeout(20.Seconds()) + .PublishMessageAndWaitAsync(new ThisMeansTrouble()); + } +} + +public record ThisMeansTrouble(); + +public class UnreliableHandler +{ + public static bool HasFailed = false; + + public static void Handle(ThisMeansTrouble message, ILogger logger, Envelope envelope) + { + logger.LogWarning("Handler called"); + + if (HasFailed) + { + envelope.Attempts.ShouldBe(2); + return; + } + HasFailed = true; + throw new Exception(); + } +} \ No newline at end of file diff --git a/src/Wolverine/Envelope.Internals.cs b/src/Wolverine/Envelope.Internals.cs index 3779ecaf0..a6eeea9cd 100644 --- a/src/Wolverine/Envelope.Internals.cs +++ b/src/Wolverine/Envelope.Internals.cs @@ -4,6 +4,7 @@ using Wolverine.Runtime; using Wolverine.Runtime.Serialization; using Wolverine.Transports; +using Wolverine.Transports.Local; using Wolverine.Transports.Sending; using Wolverine.Util; @@ -278,4 +279,9 @@ internal ValueTask PersistAsync(IEnvelopeTransaction transaction) return ValueTask.CompletedTask; } + + internal bool IsFromLocalDurableQueue() + { + return Sender is DurableLocalQueue; + } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 6d5630093..461f0144f 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -104,7 +104,7 @@ public DurableReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPip }, _logger, _settings.Cancellation); - _receivingOne = new RetryBlock((e, _) => receiveOneAsync(e), _logger, _settings.Cancellation); + _receivingOne = new RetryBlock((e, _) => deferOneAsync(e), _logger, _settings.Cancellation); if (endpoint.TryBuildDeadLetterSender(runtime, out var dlq)) { @@ -144,12 +144,16 @@ public ValueTask CompleteAsync(Envelope envelope) public ValueTask DeferAsync(Envelope envelope) { - if (_latched) + if (_latched && !envelope.IsFromLocalDurableQueue()) { - return new ValueTask(executeWithRetriesAsync(() => receiveOneAsync(envelope))); + return new ValueTask(executeWithRetriesAsync(() => deferOneAsync(envelope))); } - envelope.Attempts++; + // GH-826, the attempts are already incremented from the executor + if (!envelope.IsFromLocalDurableQueue()) + { + envelope.Attempts++; + } Enqueue(envelope); @@ -185,7 +189,7 @@ public async ValueTask ReceivedAsync(IListener listener, Envelope envelope) if (_latched) { - await executeWithRetriesAsync(() => receiveOneAsync(envelope)); + await executeWithRetriesAsync(() => deferOneAsync(envelope)); return; } @@ -258,7 +262,7 @@ public Task MoveToScheduledUntilAsync(Envelope envelope, DateTimeOffset time) return _scheduleExecution.PostAsync(envelope); } - private async Task receiveOneAsync(Envelope envelope) + private async Task deferOneAsync(Envelope envelope) { if (_latched && envelope.Listener != null) {