diff --git a/docs/guide/durability/sagas.md b/docs/guide/durability/sagas.md index c6965e56f..2acaec984 100644 --- a/docs/guide/durability/sagas.md +++ b/docs/guide/durability/sagas.md @@ -697,3 +697,75 @@ When an `OrderPlaced` message is published, both sagas will be started independe ::: warning In `Separated` mode, messages routed to multiple sagas must be **published** (via `SendAsync` or `PublishAsync`), not **invoked** (via `InvokeAsync`). `InvokeAsync` bypasses message routing and will not reach the separated saga endpoints. ::: + +## Resequencer Saga + +Wolverine supports the [Resequencer](https://www.enterpriseintegrationpatterns.com/patterns/messaging/Resequencer.html) pattern +out of the box through the `ResequencerSaga` base class. This is useful when you need to process messages in a specific order, +but they may arrive out of sequence. + +Messages must implement the `SequencedMessage` interface: + +```cs +public interface SequencedMessage +{ + int? Order { get; } +} +``` + +Then subclass `ResequencerSaga` instead of `Saga`: + +```cs +public record StartMyWorkflow(Guid Id); + +public record MySequencedCommand(Guid SagaId, int? Order) : SequencedMessage; + +public class MyWorkflowSaga : ResequencerSaga +{ + public Guid Id { get; set; } + + public static MyWorkflowSaga Start(StartMyWorkflow cmd) + { + return new MyWorkflowSaga { Id = cmd.Id }; + } + + public void Handle(MySequencedCommand cmd) + { + // This will only be called when messages arrive in the correct order, + // or when out-of-order messages are replayed after gaps are filled + } +} +``` + +### How It Works + +Wolverine generates a `ShouldProceed()` guard around your `Handle`/`Orchestrate` methods: + +- If `Order` is `null` or `0`, the message bypasses the guard and is handled immediately +- If `Order` equals `LastSequence + 1` (the next expected sequence), the handler executes normally and `LastSequence` advances +- If `Order` is greater than `LastSequence + 1` (a gap exists), the message is added to the `Pending` list and the handler is **not** called +- When a gap-filling message arrives, any consecutive pending messages are automatically re-published to be handled in order + +The saga state is **always** persisted regardless of whether the handler was called, because the `Pending` list and `LastSequence` may have changed. + +### Key Properties + +| Property | Description | +|----------|-------------| +| `LastSequence` | The highest sequence number that has been processed in order | +| `Pending` | Messages received out of order, waiting for earlier messages to arrive | + +### Concurrency Considerations + +When using `ResequencerSaga`, we recommend also using [Partitioned Sequential Messaging](/guide/messaging/partitioning) to manage potential concurrency conflicts. When `UseInferredMessageGrouping()` is enabled, Wolverine automatically detects `SequencedMessage` types and uses the `Order` property as the group id for partitioning. Messages with `null` order values receive a random group id so they are distributed independently. + +```cs +using var host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.MessagePartitioning + // Automatically infers grouping from saga identity AND + // SequencedMessage.Order for resequencer sagas + .UseInferredMessageGrouping(); + }).StartAsync(); +``` diff --git a/docs/guide/messaging/partitioning.md b/docs/guide/messaging/partitioning.md index 06068a1a8..db2f388ee 100644 --- a/docs/guide/messaging/partitioning.md +++ b/docs/guide/messaging/partitioning.md @@ -169,8 +169,9 @@ opts.MessagePartitioning The built in rules *at this point* include: -* Using the Sage identity of a message that is handled by a [Stateful Saga](/guide/durability/sagas) +* Using the saga identity of a message that is handled by a [Stateful Saga](/guide/durability/sagas) * Using the stream/aggregate id of messages that are part of the [Aggregate Handler Workflow](/guide/durability/marten/event-sourcing) integration with Marten +* Using the `Order` property of messages that implement the `SequencedMessage` interface (used by [ResequencerSaga](/guide/durability/sagas#resequencer-saga)). Messages with a `null` order value receive a random group id so they are distributed independently ## Specifying Grouping Rules diff --git a/src/Persistence/MartenTests/Saga/resequencer_saga_end_to_end.cs b/src/Persistence/MartenTests/Saga/resequencer_saga_end_to_end.cs new file mode 100644 index 000000000..be85df315 --- /dev/null +++ b/src/Persistence/MartenTests/Saga/resequencer_saga_end_to_end.cs @@ -0,0 +1,138 @@ +using IntegrationTests; +using JasperFx.Resources; +using Marten; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; +using Xunit; + +namespace MartenTests.Saga; + +public record StartMartenSequencedSaga(Guid Id); + +public record MartenSequencedCommand(Guid SagaId, int? Order) : SequencedMessage; + +public class MartenTestResequencerSaga : ResequencerSaga +{ + public Guid Id { get; set; } + public List ProcessedOrders { get; set; } = new(); + + public static MartenTestResequencerSaga Start(StartMartenSequencedSaga cmd) + { + return new MartenTestResequencerSaga { Id = cmd.Id }; + } + + public void Handle(MartenSequencedCommand cmd) + { + ProcessedOrders.Add(cmd.Order); + } +} + +public class resequencer_saga_end_to_end : PostgresqlContext, IAsyncLifetime +{ + private IHost _host; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "resequencer_sagas"; + }).IntegrateWithWolverine(); + + opts.Services.AddResourceSetupOnStartup(); + + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(); + }) + .StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + private async Task LoadState(Guid id) + { + using var session = _host.DocumentStore().QuerySession(); + return await session.LoadAsync(id); + } + + [Fact] + public async Task messages_in_order_are_all_handled() + { + var sagaId = Guid.NewGuid(); + + await _host.InvokeMessageAndWaitAsync(new StartMartenSequencedSaga(sagaId)); + await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, 1)); + await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, 2)); + await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, 3)); + + var state = await LoadState(sagaId); + state.ShouldNotBeNull(); + state.ProcessedOrders.ShouldBe([1, 2, 3]); + state.LastSequence.ShouldBe(3); + state.Pending.ShouldBeEmpty(); + } + + [Fact] + public async Task out_of_order_message_is_queued_not_handled() + { + var sagaId = Guid.NewGuid(); + + await _host.InvokeMessageAndWaitAsync(new StartMartenSequencedSaga(sagaId)); + await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, 3)); + + var state = await LoadState(sagaId); + state.ShouldNotBeNull(); + state.ProcessedOrders.ShouldBeEmpty(); + state.LastSequence.ShouldBe(0); + state.Pending.Count.ShouldBe(1); + } + + [Fact] + public async Task out_of_order_messages_replayed_when_gap_fills() + { + var sagaId = Guid.NewGuid(); + + await _host.InvokeMessageAndWaitAsync(new StartMartenSequencedSaga(sagaId)); + // Send message 2 first, it will be queued in Pending + await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, 2)); + + // Send message 1 which fills the gap - ShouldProceed will republish 2 + await _host.ExecuteAndWaitAsync(async () => + { + await _host.Services.GetRequiredService() + .PublishAsync(new MartenSequencedCommand(sagaId, 1)); + }, timeoutInMilliseconds: 30000); + + var state = await LoadState(sagaId); + state.ShouldNotBeNull(); + state.LastSequence.ShouldBe(2); + state.Pending.ShouldBeEmpty(); + state.ProcessedOrders.ShouldContain(1); + state.ProcessedOrders.ShouldContain(2); + } + + [Fact] + public async Task null_order_bypasses_guard() + { + var sagaId = Guid.NewGuid(); + + await _host.InvokeMessageAndWaitAsync(new StartMartenSequencedSaga(sagaId)); + await _host.InvokeMessageAndWaitAsync(new MartenSequencedCommand(sagaId, null)); + + var state = await LoadState(sagaId); + state.ShouldNotBeNull(); + state.ProcessedOrders.ShouldBe([null]); + } +} diff --git a/src/Testing/CoreTests/Persistence/Sagas/resequencer_saga_in_memory.cs b/src/Testing/CoreTests/Persistence/Sagas/resequencer_saga_in_memory.cs new file mode 100644 index 000000000..eedd0c30e --- /dev/null +++ b/src/Testing/CoreTests/Persistence/Sagas/resequencer_saga_in_memory.cs @@ -0,0 +1,144 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Persistence.Sagas; +using Wolverine.Tracking; +using Wolverine.Transports; +using Xunit; + +namespace CoreTests.Persistence.Sagas; + +public record StartSequencedSaga(Guid Id); + +public record SequencedCommand(Guid SagaId, int? Order) : SequencedMessage; + +public class TestResequencerSaga : ResequencerSaga +{ + public Guid Id { get; set; } + public List ProcessedOrders { get; set; } = new(); + + public static TestResequencerSaga Start(StartSequencedSaga cmd) + { + return new TestResequencerSaga { Id = cmd.Id }; + } + + public void Handle(SequencedCommand cmd) + { + ProcessedOrders.Add(cmd.Order); + } +} + +public class resequencer_saga_in_memory : IAsyncLifetime +{ + private IHost _host; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(); + + opts.PublishAllMessages().To(TransportConstants.LocalUri); + }) + .StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + private async Task LoadState(Guid id) + { + return _host.Services.GetRequiredService() + .Load(id); + } + + [Fact] + public async Task messages_in_order_are_all_handled() + { + var sagaId = Guid.NewGuid(); + + await _host.InvokeMessageAndWaitAsync(new StartSequencedSaga(sagaId)); + await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 1)); + await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 2)); + await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 3)); + + var state = await LoadState(sagaId); + state.ShouldNotBeNull(); + state.ProcessedOrders.ShouldBe([1, 2, 3]); + state.LastSequence.ShouldBe(3); + state.Pending.ShouldBeEmpty(); + } + + [Fact] + public async Task out_of_order_message_is_queued_not_handled() + { + var sagaId = Guid.NewGuid(); + + await _host.InvokeMessageAndWaitAsync(new StartSequencedSaga(sagaId)); + await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 3)); + + var state = await LoadState(sagaId); + state.ShouldNotBeNull(); + state.ProcessedOrders.ShouldBeEmpty(); + state.LastSequence.ShouldBe(0); + state.Pending.Count.ShouldBe(1); + } + + [Fact] + public async Task out_of_order_messages_replayed_when_gap_fills() + { + var sagaId = Guid.NewGuid(); + + await _host.InvokeMessageAndWaitAsync(new StartSequencedSaga(sagaId)); + await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 3)); + await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 2)); + + // Send message 1 which fills the gap - ShouldProceed will republish 2 and 3 + // The tracked session will wait for all cascading messages to complete + await _host.ExecuteAndWaitAsync(async () => + { + await _host.Services.GetRequiredService() + .PublishAsync(new SequencedCommand(sagaId, 1)); + }, timeoutInMilliseconds: 30000); + + var state = await LoadState(sagaId); + state.ShouldNotBeNull(); + state.LastSequence.ShouldBe(3); + state.Pending.ShouldBeEmpty(); + state.ProcessedOrders.ShouldContain(1); + state.ProcessedOrders.ShouldContain(2); + state.ProcessedOrders.ShouldContain(3); + } + + [Fact] + public async Task null_order_bypasses_guard() + { + var sagaId = Guid.NewGuid(); + + await _host.InvokeMessageAndWaitAsync(new StartSequencedSaga(sagaId)); + await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, null)); + + var state = await LoadState(sagaId); + state.ShouldNotBeNull(); + state.ProcessedOrders.ShouldBe([null]); + } + + [Fact] + public async Task zero_order_bypasses_guard() + { + var sagaId = Guid.NewGuid(); + + await _host.InvokeMessageAndWaitAsync(new StartSequencedSaga(sagaId)); + await _host.InvokeMessageAndWaitAsync(new SequencedCommand(sagaId, 0)); + + var state = await LoadState(sagaId); + state.ShouldNotBeNull(); + state.ProcessedOrders.ShouldBe([0]); + } +} diff --git a/src/Testing/CoreTests/Runtime/Partitioning/MessagePartitioningRulesTests.cs b/src/Testing/CoreTests/Runtime/Partitioning/MessagePartitioningRulesTests.cs index 540a8fda8..4715accfb 100644 --- a/src/Testing/CoreTests/Runtime/Partitioning/MessagePartitioningRulesTests.cs +++ b/src/Testing/CoreTests/Runtime/Partitioning/MessagePartitioningRulesTests.cs @@ -1,4 +1,6 @@ using JasperFx.Core.Reflection; +using Shouldly; +using Wolverine; using Wolverine.ComplianceTests; using Wolverine.Runtime.Partitioning; using Xunit; @@ -75,8 +77,48 @@ public void by_specific_messages_and_properties() envelope.Message = new Coffee3("Starbucks"); rules.DetermineGroupId(envelope).ShouldBe("Starbucks"); } + [Fact] + public void sequenced_message_with_order_uses_order_as_group_id() + { + var rule = new SequencedMessageGroupingRule(typeof(TestSequencedMsg)); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new TestSequencedMsg(5); + + rule.TryFindIdentity(envelope, out var groupId).ShouldBeTrue(); + groupId.ShouldBe("5"); + } + + [Fact] + public void sequenced_message_with_null_order_gets_random_group_id() + { + var rule = new SequencedMessageGroupingRule(typeof(TestSequencedMsg)); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new TestSequencedMsg(null); + + rule.TryFindIdentity(envelope, out var groupId).ShouldBeTrue(); + groupId.ShouldNotBeNullOrEmpty(); + + // Should get a different random id each time + rule.TryFindIdentity(envelope, out var groupId2).ShouldBeTrue(); + groupId2.ShouldNotBe(groupId); + } + + [Fact] + public void sequenced_message_rule_does_not_match_wrong_type() + { + var rule = new SequencedMessageGroupingRule(typeof(TestSequencedMsg)); + + var envelope = ObjectMother.Envelope(); + envelope.Message = new Coffee1("Dark", "Paul Newman's"); + + rule.TryFindIdentity(envelope, out _).ShouldBeFalse(); + } } +public record TestSequencedMsg(int? Order) : SequencedMessage; + public interface ICoffee { string Name { get; } diff --git a/src/Wolverine/Persistence/Sagas/SagaChain.cs b/src/Wolverine/Persistence/Sagas/SagaChain.cs index 8606478c9..a9fe4c9e4 100644 --- a/src/Wolverine/Persistence/Sagas/SagaChain.cs +++ b/src/Wolverine/Persistence/Sagas/SagaChain.cs @@ -242,7 +242,7 @@ internal override List DetermineFrames(GenerationRules rules, IServiceCon } else { - generateCodeForMaybeExisting(container, frameProvider, list); + generateCodeForMaybeExisting(container, frameProvider, list, messageVariable); } // .Concat(handlerReturnValueFrames) @@ -251,7 +251,7 @@ internal override List DetermineFrames(GenerationRules rules, IServiceCon } private void generateCodeForMaybeExisting(IServiceContainer container, IPersistenceFrameProvider frameProvider, - List frames) + List frames, MessageVariable? messageVariable = null) { var findSagaId = SagaIdMember == null ? (Frame)new PullSagaIdFromEnvelopeFrame(frameProvider.DetermineSagaIdType(SagaType, container)) @@ -267,7 +267,7 @@ private void generateCodeForMaybeExisting(IServiceContainer container, IPersiste var sagaId = resolve.SagaId; var startingFrames = DetermineSagaDoesNotExistSteps(sagaId, saga, frameProvider, container).ToArray(); - var existingFrames = DetermineSagaExistsSteps(sagaId, saga, frameProvider, container).ToArray(); + var existingFrames = DetermineSagaExistsSteps(sagaId, saga, frameProvider, container, messageVariable).ToArray(); var ifNullBlock = new IfElseNullGuardFrame(saga, startingFrames, existingFrames); @@ -362,15 +362,26 @@ private static Frame buildFrameForConditionalInsert(Variable saga, IPersistenceF } internal IEnumerable DetermineSagaExistsSteps(Variable sagaId, Variable saga, - IPersistenceFrameProvider frameProvider, IServiceContainer container) + IPersistenceFrameProvider frameProvider, IServiceContainer container, MessageVariable? messageVariable = null) { // Set the saga ID on the context so cascading messages have the correct saga ID yield return new SetSagaIdFrame(sagaId); + var handlerFrames = new List(); foreach (var call in ExistingCalls) { - yield return call; - foreach (var frame in call.Creates.SelectMany(x => x.ReturnAction(this).Frames())) yield return frame; + handlerFrames.Add(call); + foreach (var frame in call.Creates.SelectMany(x => x.ReturnAction(this).Frames())) handlerFrames.Add(frame); + } + + // For ResequencerSaga, wrap handler calls with ShouldProceed() guard + if (messageVariable != null && IsResequencerGuarded) + { + yield return new ShouldProceedGuardFrame(saga, messageVariable, handlerFrames.ToArray()); + } + else + { + foreach (var frame in handlerFrames) yield return frame; } var update = frameProvider.DetermineUpdateFrame(saga, container); @@ -380,4 +391,25 @@ internal IEnumerable DetermineSagaExistsSteps(Variable sagaId, Variable s yield return frameProvider.CommitUnitOfWorkFrame(saga, container); } + + private bool IsResequencerGuarded + { + get + { + var resequencerMessageType = GetResequencerMessageType(); + return resequencerMessageType != null && MessageType.CanBeCastTo(resequencerMessageType); + } + } + + private Type? GetResequencerMessageType() + { + var current = SagaType; + while (current != null) + { + if (current.IsGenericType && current.GetGenericTypeDefinition() == typeof(ResequencerSaga<>)) + return current.GetGenericArguments()[0]; + current = current.BaseType; + } + return null; + } } diff --git a/src/Wolverine/Persistence/Sagas/ShouldProceedGuardFrame.cs b/src/Wolverine/Persistence/Sagas/ShouldProceedGuardFrame.cs new file mode 100644 index 000000000..704e89fb1 --- /dev/null +++ b/src/Wolverine/Persistence/Sagas/ShouldProceedGuardFrame.cs @@ -0,0 +1,60 @@ +using JasperFx.CodeGeneration; +using JasperFx.CodeGeneration.Frames; +using JasperFx.CodeGeneration.Model; +using Wolverine.Runtime; + +namespace Wolverine.Persistence.Sagas; + +/// +/// Code generation frame that wraps handler calls with a ResequencerSaga.ShouldProceed() guard. +/// The saga is always persisted (because Pending may change), but handler calls only execute +/// if ShouldProceed returns true. +/// +internal class ShouldProceedGuardFrame : Frame +{ + private readonly Variable _saga; + private readonly Variable _message; + private readonly Frame[] _innerFrames; + private Variable? _context; + + public ShouldProceedGuardFrame(Variable saga, Variable message, Frame[] innerFrames) : base(true) + { + _saga = saga; + _message = message; + _innerFrames = innerFrames; + uses.Add(_saga); + uses.Add(_message); + } + + public override IEnumerable FindVariables(IMethodVariables chain) + { + _context = chain.FindVariable(typeof(MessageContext)); + yield return _context; + + foreach (var inner in _innerFrames) + { + foreach (var variable in inner.FindVariables(chain)) + { + yield return variable; + } + } + } + + public override void GenerateCode(GeneratedMethod method, ISourceWriter writer) + { + writer.WriteComment("Resequencer guard - check message ordering"); + writer.Write( + $"var shouldProceed = await {_saga.Usage}.{nameof(ResequencerSaga.ShouldProceed)}({_message.Usage}, {_context!.Usage}).ConfigureAwait(false);"); + writer.Write("BLOCK:if (shouldProceed)"); + + foreach (var inner in _innerFrames) + { + inner.GenerateCode(method, writer); + } + + writer.FinishBlock(); + writer.BlankLine(); + + Next?.GenerateCode(method, writer); + } +} diff --git a/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs b/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs index 26ac3646e..ef440c3be 100644 --- a/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs +++ b/src/Wolverine/Runtime/Partitioning/MessagePartitioningRules.cs @@ -187,6 +187,13 @@ internal void MaybeInferGrouping(HandlerGraph handlerGraph) { ByMessage(chain.MessageType, property); } + + // For messages implementing SequencedMessage, use Order as the group id + // so that messages with the same sequence number are partitioned together + if (chain.MessageType.CanBeCastTo()) + { + _rules.Add(new SequencedMessageGroupingRule(chain.MessageType)); + } } } } @@ -233,9 +240,35 @@ public Grouper(PropertyInfo groupMember) public string ToGroupId(object message) { var raw = _source((TConcrete)message); - - // If it's empty, it will get randomly sorted + + // If it's empty, it will get randomly sorted // into the partitioned slots return raw?.ToString() ?? string.Empty; } +} + +/// +/// Grouping rule for SequencedMessage types that uses the Order property as the group id. +/// Messages with null or zero Order get a random group id so they are not partitioned together. +/// +internal class SequencedMessageGroupingRule : IGroupingRule +{ + private readonly Type _messageType; + + public SequencedMessageGroupingRule(Type messageType) + { + _messageType = messageType; + } + + public bool TryFindIdentity(Envelope envelope, out string groupId) + { + if (envelope.Message is SequencedMessage sequenced && envelope.Message.GetType() == _messageType) + { + groupId = sequenced.Order?.ToString() ?? Guid.NewGuid().ToString(); + return true; + } + + groupId = default!; + return false; + } } \ No newline at end of file diff --git a/src/Wolverine/Saga.cs b/src/Wolverine/Saga.cs index cd4ea73dc..30befe932 100644 --- a/src/Wolverine/Saga.cs +++ b/src/Wolverine/Saga.cs @@ -45,4 +45,62 @@ public class SagaConcurrencyException : Exception public SagaConcurrencyException(string message) : base(message) { } +} + +public interface SequencedMessage +{ + int? Order { get; } +} + +// Use code gen to "know" how to get the sequence? +public abstract class ResequencerSaga : Saga where T : SequencedMessage +{ + public List Pending { get; set; } = new(); + public int LastSequence { get; set; } + + // We'll enhance the code gen to use this around the Saga handling. So this would wrap + // around the call to the actual Handle method as a guard clause, but the saga still gets persisted + public async ValueTask ShouldProceed(T message, IMessageBus bus) + { + // TODO -- probably want a Timeout around this? + + // If there is no order, do you just let it go? Or zero? + if (!message.Order.HasValue || message.Order == 0) + { + return true; + } + + // Already processed in sequence, allow re-published messages through + if (message.Order.Value <= LastSequence) + { + return true; + } + + if (message.Order.Value != LastSequence + 1) + { + Pending.Add(message); + return false; + } + + // It can go ahead + LastSequence = message.Order.Value; + + // Try to recover any pending messages + while (Pending.Any()) + { + var next = Pending.FirstOrDefault(x => x.Order.HasValue && x.Order.Value == LastSequence + 1); + if (next == null) + { + break; + } + + Pending.Remove(next); + LastSequence = next.Order.Value; + + // This doesn't actually go out until the original message completes + await bus.PublishAsync(next); + } + + return true; + } } \ No newline at end of file