Skip to content

Commit

Permalink
Adjusting the requeue behavior for durable, local queues. Closes GH-826
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Apr 26, 2024
1 parent 9af4d22 commit d4d9f72
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
using IntegrationTests;
using JasperFx.Core;
using Marten;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Npgsql;
using Shouldly;
using Weasel.Postgresql;
using Wolverine;
using Wolverine.ErrorHandling;
using Wolverine.Marten;
using Wolverine.Runtime.Interop.MassTransit;
using Wolverine.Tracking;

namespace MartenTests.Bugs;

public class Bug_826_issue_with_paused_listener
{
private async Task dropSchema()
{
using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
await conn.OpenAsync();
await conn.DropSchemaAsync("bug826");
await conn.CloseAsync();
}

[Fact]
public async Task can_resume_listening()
{
await dropSchema();

var builder = Host.CreateDefaultBuilder();

builder.UseWolverine(options =>
{
options.LocalQueueFor<ThisMeansTrouble>().Sequential();
options.OnException<Exception>()
.Requeue().AndPauseProcessing(5.Seconds());

options.Durability.Mode = DurabilityMode.Solo;

options.Policies.AutoApplyTransactions();
options.Policies.UseDurableLocalQueues();
options.Policies.UseDurableOutboxOnAllSendingEndpoints();

options.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "bug826";
m.DisableNpgsqlLogging = true;
})
.UseLightweightSessions()
.IntegrateWithWolverine();
});

using var host = await builder.StartAsync();

await host.TrackActivity()
.WaitForMessageToBeReceivedAt<ThisMeansTrouble>(host)
.Timeout(20.Seconds())
.PublishMessageAndWaitAsync(new ThisMeansTrouble());
}
}

public record ThisMeansTrouble();

public class UnreliableHandler
{
public static bool HasFailed = false;

public static void Handle(ThisMeansTrouble message, ILogger logger, Envelope envelope)
{
logger.LogWarning("Handler called");

if (HasFailed)
{
envelope.Attempts.ShouldBe(2);
return;
}
HasFailed = true;
throw new Exception();
}
}
6 changes: 6 additions & 0 deletions src/Wolverine/Envelope.Internals.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Wolverine.Runtime;
using Wolverine.Runtime.Serialization;
using Wolverine.Transports;
using Wolverine.Transports.Local;
using Wolverine.Transports.Sending;
using Wolverine.Util;

Expand Down Expand Up @@ -278,4 +279,9 @@ internal ValueTask PersistAsync(IEnvelopeTransaction transaction)

return ValueTask.CompletedTask;
}

internal bool IsFromLocalDurableQueue()
{
return Sender is DurableLocalQueue;
}
}
16 changes: 10 additions & 6 deletions src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public DurableReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPip
}, _logger,
_settings.Cancellation);

_receivingOne = new RetryBlock<Envelope>((e, _) => receiveOneAsync(e), _logger, _settings.Cancellation);
_receivingOne = new RetryBlock<Envelope>((e, _) => deferOneAsync(e), _logger, _settings.Cancellation);

if (endpoint.TryBuildDeadLetterSender(runtime, out var dlq))
{
Expand Down Expand Up @@ -144,12 +144,16 @@ public ValueTask CompleteAsync(Envelope envelope)

public ValueTask DeferAsync(Envelope envelope)
{
if (_latched)
if (_latched && !envelope.IsFromLocalDurableQueue())
{
return new ValueTask(executeWithRetriesAsync(() => receiveOneAsync(envelope)));
return new ValueTask(executeWithRetriesAsync(() => deferOneAsync(envelope)));
}

envelope.Attempts++;
// GH-826, the attempts are already incremented from the executor
if (!envelope.IsFromLocalDurableQueue())
{
envelope.Attempts++;
}

Enqueue(envelope);

Expand Down Expand Up @@ -185,7 +189,7 @@ public async ValueTask ReceivedAsync(IListener listener, Envelope envelope)

if (_latched)
{
await executeWithRetriesAsync(() => receiveOneAsync(envelope));
await executeWithRetriesAsync(() => deferOneAsync(envelope));
return;
}

Expand Down Expand Up @@ -258,7 +262,7 @@ public Task MoveToScheduledUntilAsync(Envelope envelope, DateTimeOffset time)
return _scheduleExecution.PostAsync(envelope);
}

private async Task receiveOneAsync(Envelope envelope)
private async Task deferOneAsync(Envelope envelope)
{
if (_latched && envelope.Listener != null)
{
Expand Down

0 comments on commit d4d9f72

Please sign in to comment.