Skip to content

Commit

Permalink
Fixed the issue with replaying messages that get DLQ'd from the exter…
Browse files Browse the repository at this point in the history
…nal messages. Closes GH-1225
  • Loading branch information
jeremydmiller committed Jan 21, 2025
1 parent 80bd290 commit 7981edf
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Wolverine.Persistence.Durability;
using Wolverine.Postgresql;
using Wolverine.RDBMS.Transport;
using Wolverine.Runtime.Handlers;
using Wolverine.Tracking;

namespace PostgresqlTests.Transport;
Expand Down Expand Up @@ -181,6 +182,47 @@ public async Task end_to_end_default_variable_message_types_customize_table_in_e
envelope.Destination.ShouldBe(new Uri("external-table://external.incoming1/"));
}

[Fact]
public async Task pull_in_message_that_goes_to_dead_letter_queue_and_replay_it()
{
using var host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.Durability.Mode = DurabilityMode.Solo;
opts.Durability.ScheduledJobPollingTime = 1.Seconds();

opts.UsePostgresqlPersistenceAndTransport(Servers.PostgresConnectionString, "external");

opts.ListenForMessagesFromExternalDatabaseTable("external", "incoming4", table =>
{
table.IdColumnName = "pk";
table.TimestampColumnName = "added";
table.JsonBodyColumnName = "message_body";
table.MessageType = typeof(BlowsUpMessage);
table.PollingInterval = 1.Seconds();
});

}).StartAsync();

// Rig it up to fail
var waiter = BlowsUpMessageHandler.WaiterForCall(true);

await host.SendMessageThroughExternalTable("external.incoming4", new BlowsUpMessage());
var storage = host.GetRuntime().Storage;
Guid[] ids = new Guid[0];
while (!ids.Any())
{
var queued = await storage.DeadLetters.QueryDeadLetterEnvelopesAsync(new DeadLetterEnvelopeQueryParameters());
ids = queued.DeadLetterEnvelopes.Select(x => x.Envelope.Id).ToArray();
}

// need to reset it
var dlq = BlowsUpMessageHandler.WaiterForCall(false);
await storage.DeadLetters.MarkDeadLetterEnvelopesAsReplayableAsync(ids);
await dlq;
BlowsUpMessageHandler.LastReceived.ShouldNotBeNull();
}

}

public static class Bootstrapping
Expand Down Expand Up @@ -279,5 +321,33 @@ public record BlowsUpMessage;

public static class BlowsUpMessageHandler
{
public static void Handle(BlowsUpMessage message) => throw new Exception("You stink!");
public static TaskCompletionSource Waiter { get; private set; } = new();

public static void Configure(HandlerChain chain)
{
chain.OnAnyException().MoveToErrorQueue();
}

public static bool WillBlowUp { get; set; } = true;

public static Task WaiterForCall(bool shouldThrow)
{
LastReceived = null;
WillBlowUp = shouldThrow;
Waiter = new TaskCompletionSource();
return Waiter.Task;
}

public static void Handle(BlowsUpMessage message)
{
if (WillBlowUp)
{
throw new Exception("You stink!");
}

LastReceived = message;
Waiter.SetResult();
}

public static BlowsUpMessage LastReceived { get; set; }
}
3 changes: 3 additions & 0 deletions src/Persistence/Wolverine.RDBMS/MessageDatabase.Polling.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ public async Task PollForMessagesFromExternalTablesAsync(IListener listener,
{
envelope.Status = EnvelopeStatus.Incoming;
envelope.OwnerId = runtime.DurabilitySettings.AssignedNodeNumber;

// Fix for GH-1225
envelope.Source = externalTable.TableName.QualifiedName;
}

var tx = await conn.BeginTransactionAsync(token);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Weasel.Core;
using Wolverine.Configuration;
using Wolverine.Runtime;
using Wolverine.Runtime.WorkerQueues;
using Wolverine.Transports;
using Wolverine.Transports.Sending;
using Wolverine.Util;
Expand Down
2 changes: 1 addition & 1 deletion src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public DurableReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPip

public bool ShouldPersistBeforeProcessing { get; set; }

public Uri Uri { get; }
public Uri Uri { get; set; }

public async ValueTask DisposeAsync()
{
Expand Down
22 changes: 20 additions & 2 deletions src/Wolverine/Transports/IReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace Wolverine.Transports;
using Wolverine.Runtime.WorkerQueues;

namespace Wolverine.Transports;

public interface IReceiver : IDisposable
{
Expand All @@ -8,7 +10,7 @@ public interface IReceiver : IDisposable
ValueTask DrainAsync();
}

internal class ReceiverWithRules : IReceiver
internal class ReceiverWithRules : IReceiver, ILocalQueue
{
public ReceiverWithRules(IReceiver inner, IEnumerable<IEnvelopeRule> rules)
{
Expand Down Expand Up @@ -52,4 +54,20 @@ public ValueTask DrainAsync()
{
return Inner.DrainAsync();
}

public void Enqueue(Envelope envelope)
{
if (Inner is ILocalQueue queue)
{
queue.Enqueue(envelope);
}
else
{
throw new InvalidOperationException("There is no active, local queue for this listening endpoint at " +
envelope.Destination);
}
}

public int QueueCount => Inner is ILocalQueue q ? q.QueueCount : 0;
public Uri Uri => Inner is ILocalQueue q ? q.Uri : new Uri("none://none");
}

0 comments on commit 7981edf

Please sign in to comment.