diff --git a/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs b/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs index 32f64423..95d864a2 100644 --- a/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs +++ b/src/Persistence/PostgresqlTests/Transport/external_message_tables.cs @@ -18,6 +18,7 @@ using Wolverine.Persistence.Durability; using Wolverine.Postgresql; using Wolverine.RDBMS.Transport; +using Wolverine.Runtime.Handlers; using Wolverine.Tracking; namespace PostgresqlTests.Transport; @@ -181,6 +182,47 @@ public async Task end_to_end_default_variable_message_types_customize_table_in_e envelope.Destination.ShouldBe(new Uri("external-table://external.incoming1/")); } + [Fact] + public async Task pull_in_message_that_goes_to_dead_letter_queue_and_replay_it() + { + using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Durability.Mode = DurabilityMode.Solo; + opts.Durability.ScheduledJobPollingTime = 1.Seconds(); + + opts.UsePostgresqlPersistenceAndTransport(Servers.PostgresConnectionString, "external"); + + opts.ListenForMessagesFromExternalDatabaseTable("external", "incoming4", table => + { + table.IdColumnName = "pk"; + table.TimestampColumnName = "added"; + table.JsonBodyColumnName = "message_body"; + table.MessageType = typeof(BlowsUpMessage); + table.PollingInterval = 1.Seconds(); + }); + + }).StartAsync(); + + // Rig it up to fail + var waiter = BlowsUpMessageHandler.WaiterForCall(true); + + await host.SendMessageThroughExternalTable("external.incoming4", new BlowsUpMessage()); + var storage = host.GetRuntime().Storage; + Guid[] ids = new Guid[0]; + while (!ids.Any()) + { + var queued = await storage.DeadLetters.QueryDeadLetterEnvelopesAsync(new DeadLetterEnvelopeQueryParameters()); + ids = queued.DeadLetterEnvelopes.Select(x => x.Envelope.Id).ToArray(); + } + + // need to reset it + var dlq = BlowsUpMessageHandler.WaiterForCall(false); + await storage.DeadLetters.MarkDeadLetterEnvelopesAsReplayableAsync(ids); + await dlq; + BlowsUpMessageHandler.LastReceived.ShouldNotBeNull(); + } + } public static class Bootstrapping @@ -279,5 +321,33 @@ public record BlowsUpMessage; public static class BlowsUpMessageHandler { - public static void Handle(BlowsUpMessage message) => throw new Exception("You stink!"); + public static TaskCompletionSource Waiter { get; private set; } = new(); + + public static void Configure(HandlerChain chain) + { + chain.OnAnyException().MoveToErrorQueue(); + } + + public static bool WillBlowUp { get; set; } = true; + + public static Task WaiterForCall(bool shouldThrow) + { + LastReceived = null; + WillBlowUp = shouldThrow; + Waiter = new TaskCompletionSource(); + return Waiter.Task; + } + + public static void Handle(BlowsUpMessage message) + { + if (WillBlowUp) + { + throw new Exception("You stink!"); + } + + LastReceived = message; + Waiter.SetResult(); + } + + public static BlowsUpMessage LastReceived { get; set; } } \ No newline at end of file diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Polling.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Polling.cs index aedc29ee..18703885 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Polling.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Polling.cs @@ -37,6 +37,9 @@ public async Task PollForMessagesFromExternalTablesAsync(IListener listener, { envelope.Status = EnvelopeStatus.Incoming; envelope.OwnerId = runtime.DurabilitySettings.AssignedNodeNumber; + + // Fix for GH-1225 + envelope.Source = externalTable.TableName.QualifiedName; } var tx = await conn.BeginTransactionAsync(token); diff --git a/src/Persistence/Wolverine.RDBMS/Transport/ExternalMessageTable.cs b/src/Persistence/Wolverine.RDBMS/Transport/ExternalMessageTable.cs index 80e9330a..7aee1d11 100644 --- a/src/Persistence/Wolverine.RDBMS/Transport/ExternalMessageTable.cs +++ b/src/Persistence/Wolverine.RDBMS/Transport/ExternalMessageTable.cs @@ -3,6 +3,7 @@ using Weasel.Core; using Wolverine.Configuration; using Wolverine.Runtime; +using Wolverine.Runtime.WorkerQueues; using Wolverine.Transports; using Wolverine.Transports.Sending; using Wolverine.Util; diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index edc31407..7743679b 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -125,7 +125,7 @@ public DurableReceiver(Endpoint endpoint, IWolverineRuntime runtime, IHandlerPip public bool ShouldPersistBeforeProcessing { get; set; } - public Uri Uri { get; } + public Uri Uri { get; set; } public async ValueTask DisposeAsync() { diff --git a/src/Wolverine/Transports/IReceiver.cs b/src/Wolverine/Transports/IReceiver.cs index 44f761b6..1f8079a0 100644 --- a/src/Wolverine/Transports/IReceiver.cs +++ b/src/Wolverine/Transports/IReceiver.cs @@ -1,4 +1,6 @@ -namespace Wolverine.Transports; +using Wolverine.Runtime.WorkerQueues; + +namespace Wolverine.Transports; public interface IReceiver : IDisposable { @@ -8,7 +10,7 @@ public interface IReceiver : IDisposable ValueTask DrainAsync(); } -internal class ReceiverWithRules : IReceiver +internal class ReceiverWithRules : IReceiver, ILocalQueue { public ReceiverWithRules(IReceiver inner, IEnumerable rules) { @@ -52,4 +54,20 @@ public ValueTask DrainAsync() { return Inner.DrainAsync(); } + + public void Enqueue(Envelope envelope) + { + if (Inner is ILocalQueue queue) + { + queue.Enqueue(envelope); + } + else + { + throw new InvalidOperationException("There is no active, local queue for this listening endpoint at " + + envelope.Destination); + } + } + + public int QueueCount => Inner is ILocalQueue q ? q.QueueCount : 0; + public Uri Uri => Inner is ILocalQueue q ? q.Uri : new Uri("none://none"); } \ No newline at end of file