Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ public async Task with_main_store()


var tracked = await _host.TrackActivity()

// This new helper just resets the main Marten store
// Equivalent to calling IHost.ResetAllMartenDataAsync()
.ResetAllMartenDataFirst()

.PauseThenCatchUpOnMartenDaemonActivity(CatchUpMode.AndResumeNormally)
.InvokeMessageAndWaitAsync(new AppendLetters(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"]));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ public MartenToWolverineOutbox(IServiceProvider services)

public async ValueTask<IMessageBatch> CreateBatch(DocumentSessionBase session)
{
var context = new MessageContext(_runtime.Value, session.TenantId);
var context = new MessageContext(_runtime.Value, session.TenantId)
{
MultiFlushMode = MultiFlushMode.AllowMultiples
};

await context.EnlistInOutboxAsync(new MartenEnvelopeTransaction(session, context));

return new MartenToWolverineMessageBatch(context, session);
Expand Down
77 changes: 77 additions & 0 deletions src/Testing/CoreTests/Runtime/MessageContextTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Wolverine.Runtime;
using Wolverine.Tracking;
using Wolverine.Transports;
using Wolverine.Transports.Sending;
using Wolverine.Transports.Tcp;
using Wolverine.Util;
using Xunit;
Expand Down Expand Up @@ -39,6 +40,82 @@ public MessageContextTests()
theEnvelope = ObjectMother.Envelope();
}

[Fact]
public void default_multi_flush_mode_is_OnlyOnce()
{
theContext.MultiFlushMode.ShouldBe(MultiFlushMode.OnlyOnce);
}

[Fact]
public async Task try_to_call_flush_outgoing_twice_in_normal_mode()
{
theEnvelope.Sender = Substitute.For<ISendingAgent>();

// Wacky, but this adds it to the MessageContext
await theContext.As<IEnvelopeTransaction>().PersistOutgoingAsync(theEnvelope);

// Point is that nothing happens, no exceptions
await theContext.FlushOutgoingMessagesAsync();

var envelope2 = ObjectMother.Envelope();
envelope2.Sender = Substitute.For<ISendingAgent>();

// Wacky, but this adds it to the MessageContext
await theContext.As<IEnvelopeTransaction>().PersistOutgoingAsync(envelope2);
await theContext.FlushOutgoingMessagesAsync();

await envelope2.Sender.DidNotReceive().StoreAndForwardAsync(envelope2);
}

[Fact]
public async Task try_to_call_flush_outgoing_twice_in_allow_multiples_mode()
{
theContext.MultiFlushMode = MultiFlushMode.AllowMultiples;

theEnvelope.Sender = Substitute.For<ISendingAgent>();

// Wacky, but this adds it to the MessageContext
await theContext.As<IEnvelopeTransaction>().PersistOutgoingAsync(theEnvelope);

// Point is that nothing happens, no exceptions
await theContext.FlushOutgoingMessagesAsync();

var envelope2 = ObjectMother.Envelope();
envelope2.Sender = Substitute.For<ISendingAgent>();

// Wacky, but this adds it to the MessageContext
await theContext.As<IEnvelopeTransaction>().PersistOutgoingAsync(envelope2);
await theContext.FlushOutgoingMessagesAsync();

await envelope2.Sender.Received().StoreAndForwardAsync(envelope2);
}

[Fact]
public async Task throw_on_multiple_calls_to_flush_if_the_mode_is_assert_on_multiples()
{
theContext.MultiFlushMode = MultiFlushMode.AssertOnMultiples;


theEnvelope.Sender = Substitute.For<ISendingAgent>();

// Wacky, but this adds it to the MessageContext
await theContext.As<IEnvelopeTransaction>().PersistOutgoingAsync(theEnvelope);
await theContext.FlushOutgoingMessagesAsync();

// Wacky, but this adds it to the MessageContext
var envelope2 = ObjectMother.Envelope();
envelope2.Sender = Substitute.For<ISendingAgent>();

// Wacky, but this adds it to the MessageContext
await theContext.As<IEnvelopeTransaction>().PersistOutgoingAsync(envelope2);
await Should.ThrowAsync<InvalidOperationException>(async () =>
{
await theContext.FlushOutgoingMessagesAsync();
});

await envelope2.Sender.DidNotReceive().StoreAndForwardAsync(envelope2);
}

[Fact]
public async Task reject_side_effect_as_cascading_message()
{
Expand Down
40 changes: 39 additions & 1 deletion src/Wolverine/Runtime/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,25 @@

namespace Wolverine.Runtime;

public enum MultiFlushMode
{
/// <summary>
/// The default mode, additional calls to FlushOutgoingMessages() are ignored
/// </summary>
OnlyOnce,

/// <summary>
/// Allow for multiple calls to FlushOutgoingMessages()
/// </summary>
AllowMultiples,

/// <summary>
/// Throw an exception on additional calls to FlushOutgoingMessages(). Use this to troubleshoot
/// erroneous behavior
/// </summary>
AssertOnMultiples
}

public class MessageContext : MessageBus, IMessageContext, IHasTenantId, IEnvelopeTransaction, IEnvelopeLifecycle
{
private IChannelCallback? _channel;
Expand All @@ -29,6 +48,12 @@ public MessageContext(IWolverineRuntime runtime, string tenantId) : base(runtime
TenantId = runtime.Options.Durability.TenantIdStyle.MaybeCorrectTenantId(tenantId);
}

/// <summary>
/// Governs how the MessageContext will handle subsequent calls to FlushOutgoingMessages(). The
/// default behavior is to quietly ignore any additional calls
/// </summary>
public MultiFlushMode MultiFlushMode { get; set; } = MultiFlushMode.OnlyOnce;

internal IList<Envelope> Scheduled { get; } = new List<Envelope>();

private bool hasRequestedReply()
Expand All @@ -45,9 +70,22 @@ public async Task FlushOutgoingMessagesAsync()
{
if (_hasFlushed)
{
return;
switch (MultiFlushMode)
{
case MultiFlushMode.OnlyOnce:
return;

case MultiFlushMode.AllowMultiples:
Runtime.Logger.LogWarning("Received multiple calls to FlushOutgoingMessagesAsync() to a single MessageContext");
break;

case MultiFlushMode.AssertOnMultiples:
throw new InvalidOperationException(
$"This MessageContext does not allow multiple calls to {nameof(FlushOutgoingMessagesAsync)} because {nameof(MultiFlushMode)} = {MultiFlushMode}");
}
}


await AssertAnyRequiredResponseWasGenerated();

if (!Outstanding.Any())
Expand Down
Loading