diff --git a/src/Persistence/MartenTests/TestHelpers/catch_up_then_restart.cs b/src/Persistence/MartenTests/TestHelpers/catch_up_then_restart.cs index efdd28f86..910530d5d 100644 --- a/src/Persistence/MartenTests/TestHelpers/catch_up_then_restart.cs +++ b/src/Persistence/MartenTests/TestHelpers/catch_up_then_restart.cs @@ -70,7 +70,11 @@ public async Task with_main_store() var tracked = await _host.TrackActivity() + + // This new helper just resets the main Marten store + // Equivalent to calling IHost.ResetAllMartenDataAsync() .ResetAllMartenDataFirst() + .PauseThenCatchUpOnMartenDaemonActivity(CatchUpMode.AndResumeNormally) .InvokeMessageAndWaitAsync(new AppendLetters(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); diff --git a/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineOutbox.cs b/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineOutbox.cs index 44c85d4f8..e1a2373ec 100644 --- a/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineOutbox.cs +++ b/src/Persistence/Wolverine.Marten/Publishing/MartenToWolverineOutbox.cs @@ -18,7 +18,11 @@ public MartenToWolverineOutbox(IServiceProvider services) public async ValueTask CreateBatch(DocumentSessionBase session) { - var context = new MessageContext(_runtime.Value, session.TenantId); + var context = new MessageContext(_runtime.Value, session.TenantId) + { + MultiFlushMode = MultiFlushMode.AllowMultiples + }; + await context.EnlistInOutboxAsync(new MartenEnvelopeTransaction(session, context)); return new MartenToWolverineMessageBatch(context, session); diff --git a/src/Testing/CoreTests/Runtime/MessageContextTests.cs b/src/Testing/CoreTests/Runtime/MessageContextTests.cs index 0f1f8036e..f3cd021f7 100644 --- a/src/Testing/CoreTests/Runtime/MessageContextTests.cs +++ b/src/Testing/CoreTests/Runtime/MessageContextTests.cs @@ -8,6 +8,7 @@ using Wolverine.Runtime; using Wolverine.Tracking; using Wolverine.Transports; +using Wolverine.Transports.Sending; using Wolverine.Transports.Tcp; using Wolverine.Util; using Xunit; @@ -39,6 +40,82 @@ public MessageContextTests() theEnvelope = ObjectMother.Envelope(); } + [Fact] + public void default_multi_flush_mode_is_OnlyOnce() + { + theContext.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce); + } + + [Fact] + public async Task try_to_call_flush_outgoing_twice_in_normal_mode() + { + theEnvelope.Sender = Substitute.For(); + + // Wacky, but this adds it to the MessageContext + await theContext.As().PersistOutgoingAsync(theEnvelope); + + // Point is that nothing happens, no exceptions + await theContext.FlushOutgoingMessagesAsync(); + + var envelope2 = ObjectMother.Envelope(); + envelope2.Sender = Substitute.For(); + + // Wacky, but this adds it to the MessageContext + await theContext.As().PersistOutgoingAsync(envelope2); + await theContext.FlushOutgoingMessagesAsync(); + + await envelope2.Sender.DidNotReceive().StoreAndForwardAsync(envelope2); + } + + [Fact] + public async Task try_to_call_flush_outgoing_twice_in_allow_multiples_mode() + { + theContext.MultiFlushMode = MultiFlushMode.AllowMultiples; + + theEnvelope.Sender = Substitute.For(); + + // Wacky, but this adds it to the MessageContext + await theContext.As().PersistOutgoingAsync(theEnvelope); + + // Point is that nothing happens, no exceptions + await theContext.FlushOutgoingMessagesAsync(); + + var envelope2 = ObjectMother.Envelope(); + envelope2.Sender = Substitute.For(); + + // Wacky, but this adds it to the MessageContext + await theContext.As().PersistOutgoingAsync(envelope2); + await theContext.FlushOutgoingMessagesAsync(); + + await envelope2.Sender.Received().StoreAndForwardAsync(envelope2); + } + + [Fact] + public async Task throw_on_multiple_calls_to_flush_if_the_mode_is_assert_on_multiples() + { + theContext.MultiFlushMode = MultiFlushMode.AssertOnMultiples; + + + theEnvelope.Sender = Substitute.For(); + + // Wacky, but this adds it to the MessageContext + await theContext.As().PersistOutgoingAsync(theEnvelope); + await theContext.FlushOutgoingMessagesAsync(); + + // Wacky, but this adds it to the MessageContext + var envelope2 = ObjectMother.Envelope(); + envelope2.Sender = Substitute.For(); + + // Wacky, but this adds it to the MessageContext + await theContext.As().PersistOutgoingAsync(envelope2); + await Should.ThrowAsync(async () => + { + await theContext.FlushOutgoingMessagesAsync(); + }); + + await envelope2.Sender.DidNotReceive().StoreAndForwardAsync(envelope2); + } + [Fact] public async Task reject_side_effect_as_cascading_message() { diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index 9533210bc..3e61d52e8 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -12,6 +12,25 @@ namespace Wolverine.Runtime; +public enum MultiFlushMode +{ + /// + /// The default mode, additional calls to FlushOutgoingMessages() are ignored + /// + OnlyOnce, + + /// + /// Allow for multiple calls to FlushOutgoingMessages() + /// + AllowMultiples, + + /// + /// Throw an exception on additional calls to FlushOutgoingMessages(). Use this to troubleshoot + /// erroneous behavior + /// + AssertOnMultiples +} + public class MessageContext : MessageBus, IMessageContext, IHasTenantId, IEnvelopeTransaction, IEnvelopeLifecycle { private IChannelCallback? _channel; @@ -29,6 +48,12 @@ public MessageContext(IWolverineRuntime runtime, string tenantId) : base(runtime TenantId = runtime.Options.Durability.TenantIdStyle.MaybeCorrectTenantId(tenantId); } + /// + /// Governs how the MessageContext will handle subsequent calls to FlushOutgoingMessages(). The + /// default behavior is to quietly ignore any additional calls + /// + public MultiFlushMode MultiFlushMode { get; set; } = MultiFlushMode.OnlyOnce; + internal IList Scheduled { get; } = new List(); private bool hasRequestedReply() @@ -45,9 +70,22 @@ public async Task FlushOutgoingMessagesAsync() { if (_hasFlushed) { - return; + switch (MultiFlushMode) + { + case MultiFlushMode.OnlyOnce: + return; + + case MultiFlushMode.AllowMultiples: + Runtime.Logger.LogWarning("Received multiple calls to FlushOutgoingMessagesAsync() to a single MessageContext"); + break; + + case MultiFlushMode.AssertOnMultiples: + throw new InvalidOperationException( + $"This MessageContext does not allow multiple calls to {nameof(FlushOutgoingMessagesAsync)} because {nameof(MultiFlushMode)} = {MultiFlushMode}"); + } } + await AssertAnyRequiredResponseWasGenerated(); if (!Outstanding.Any())