Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
using System.Diagnostics;
using IntegrationTests;
using JasperFx;
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Events.Daemon;
using JasperFx.Events.Projections;
using JasperFx.Resources;
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.Marten;
using Wolverine.Tracking;

namespace MartenTests.AncillaryStores;

/// <summary>
/// GH-2529: Investigation test for the silent-hang concern when combining
/// (a) a Marten ancillary document store with IntegrateWithWolverine,
/// (b) the async daemon running on that ancillary store,
/// (c) a multi-stream projection that publishes Wolverine messages via
/// RaiseSideEffects().
///
/// If this test passes consistently, the suspected silent failure is not
/// reproducible at the framework integration level. If it hangs or fails,
/// we have a precise reproducer to drive the fix.
/// </summary>
public class multi_stream_projection_with_side_effects_on_ancillary_store : IAsyncLifetime
{
private IHost _host = null!;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Primary Marten store + Wolverine integration
opts.Services.AddMarten(m =>
{
m.Connection(Servers.PostgresConnectionString);
m.DatabaseSchemaName = "issue2529_main";
m.Events.DatabaseSchemaName = "issue2529_main";
m.DisableNpgsqlLogging = true;
}).IntegrateWithWolverine();

opts.Durability.Mode = DurabilityMode.Solo;
opts.Policies.AutoApplyTransactions();

// Ancillary store with async daemon + multi-stream projection
opts.Services.AddMartenStore<IIssue2529Store>(sp =>
{
var storeOptions = new StoreOptions();
storeOptions.Connection(Servers.PostgresConnectionString);
storeOptions.DatabaseSchemaName = "issue2529_ancillary";
storeOptions.Events.DatabaseSchemaName = "issue2529_ancillary";
storeOptions.DisableNpgsqlLogging = true;

// The projection that calls RaiseSideEffects → publishes a Wolverine message
storeOptions.Projections.Add<Issue2529CounterProjection>(ProjectionLifecycle.Async);
return storeOptions;
})
.IntegrateWithWolverine()
.AddAsyncDaemon(DaemonMode.Solo);

opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState);

opts.Discovery.DisableConventionalDiscovery()
.IncludeType<Issue2529SideEffectHandler>();
}).StartAsync();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

private async Task AppendAndWaitForProjectionAsync(params Guid[] streamIds)
{
using var scope = _host.Services.CreateScope();
var store = scope.ServiceProvider.GetRequiredService<IIssue2529Store>();

await using (var session = store.LightweightSession())
{
foreach (var id in streamIds)
{
session.Events.StartStream<Issue2529Counter>(id, new IncrementCounter());
}
await session.SaveChangesAsync();
}

// Make sure the async daemon has caught up to all the events we just wrote.
// This rules out "the daemon is just slow" as the cause of side-effect non-delivery.
using var daemon = await store.BuildProjectionDaemonAsync();
await daemon.WaitForNonStaleData(60.Seconds());
}

[Fact]
public async Task projection_side_effect_message_reaches_wolverine_handler()
{
var streamId = Guid.NewGuid();

Issue2529SideEffectHandler.SeenStreamIds.Clear();

var tracked = await _host
.TrackActivity()
.Timeout(60.Seconds())
.IncludeExternalTransports()
.WaitForMessageToBeReceivedAt<CounterIncremented>(_host)
.ExecuteAndWaitAsync((Func<IMessageContext, Task>)(_ => AppendAndWaitForProjectionAsync(streamId)));

tracked.Executed.MessagesOf<CounterIncremented>()
.Where(m => m.StreamId == streamId)
.ShouldHaveSingleItem();
}

// GH-2529: Was failing because concurrent slice processing in Marten's
// AggregationRunner corrupted MessageContext._outstanding (concurrent
// List<Envelope>.Add is not thread-safe), dropping most side-effect messages
// with no exception. Fixed by adding a lock around _outstanding mutations
// in MessageBus / MessageContext.
[Fact]
public async Task multiple_side_effects_in_one_batch_all_reach_wolverine()
{
var streamIds = new[] { Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid() };

Issue2529SideEffectHandler.SeenStreamIds.Clear();

var tracked = await _host
.TrackActivity()
.Timeout(60.Seconds())
.IncludeExternalTransports()
.ExecuteAndWaitAsync((Func<IMessageContext, Task>)(_ => AppendAndWaitForProjectionAsync(streamIds)));

var ourStreamIds = streamIds.ToHashSet();
var seen = tracked.Executed.MessagesOf<CounterIncremented>()
.Where(m => ourStreamIds.Contains(m.StreamId))
.Select(m => m.StreamId)
.OrderBy(x => x)
.ToArray();

seen.ShouldBe(streamIds.OrderBy(x => x).ToArray());
}
}

// ── Marker interface for the ancillary store ──
public interface IIssue2529Store : IDocumentStore;

// ── Domain ──
public record IncrementCounter;

public class Issue2529Counter
{
public Guid Id { get; set; }
public int Count { get; set; }
}

// ── Multi-stream projection that publishes side effects ──
public class Issue2529CounterProjection : MultiStreamProjection<Issue2529Counter, Guid>
{
public Issue2529CounterProjection()
{
Identity<IEvent>(x => x.StreamId);
}

public static Issue2529Counter Create(IncrementCounter @event, IEvent metadata) =>
new() { Id = metadata.StreamId, Count = 1 };

public void Apply(Issue2529Counter counter, IncrementCounter _) => counter.Count++;

public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice<Issue2529Counter> slice)
{
if (slice.Aggregate is not null)
{
slice.PublishMessage(new CounterIncremented(slice.Aggregate.Id, slice.Aggregate.Count));
}
return ValueTask.CompletedTask;
}
}

public record CounterIncremented(Guid StreamId, int Count);

// ── Wolverine handler — records what it saw so the test can assert ──
public class Issue2529SideEffectHandler
{
public static Guid LastSeenStreamId = Guid.Empty;
public static readonly List<Guid> SeenStreamIds = new();

public static void Handle(CounterIncremented msg)
{
LastSeenStreamId = msg.StreamId;
lock (SeenStreamIds) SeenStreamIds.Add(msg.StreamId);
Debug.WriteLine($"Side effect received for stream {msg.StreamId} (count={msg.Count})");
}
}
30 changes: 27 additions & 3 deletions src/Wolverine/Runtime/MessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ public static MessageBus Build(IWolverineRuntime runtime, string correlationId)

// ReSharper disable once InconsistentNaming
protected readonly List<Envelope> _outstanding = new();

// Protects _outstanding from concurrent mutation. The Marten async daemon's
// multi-stream projection runner can call PublishAsync on the same MessageContext
// from many slices in parallel (Block parallelism = 10). Without this lock,
// concurrent List<Envelope>.Add silently corrupts the list and drops messages.
// GH-2529.
// ReSharper disable once InconsistentNaming
protected readonly object _outstandingLock = new();
private string? _tenantId;

public MessageBus(IWolverineRuntime runtime) : this(runtime, Activity.Current?.RootId ?? Guid.NewGuid().ToString())
Expand Down Expand Up @@ -57,7 +65,17 @@ public virtual Task ReScheduleCurrentAsync(DateTimeOffset rescheduledAt)
public IWolverineRuntime Runtime { get; }
public IMessageStore Storage { get; internal set; }

public IEnumerable<Envelope> Outstanding => _outstanding;
/// <summary>
/// Snapshot of envelopes published in this context that have not yet been flushed.
/// Returns a copy to avoid concurrent-enumeration issues — see <see cref="_outstandingLock"/>.
/// </summary>
public IEnumerable<Envelope> Outstanding
{
get
{
lock (_outstandingLock) return _outstanding.ToArray();
}
}

public IEnvelopeTransaction? Transaction { get; protected set; }
public Guid ConversationId { get; protected set; }
Expand Down Expand Up @@ -267,7 +285,10 @@ internal async ValueTask PersistOrSendAsync(Envelope envelope)

if (Transaction is not null)
{
_outstanding.Fill(envelope);
lock (_outstandingLock)
{
_outstanding.Fill(envelope);
}

await envelope.PersistAsync(Transaction);

Expand Down Expand Up @@ -366,7 +387,10 @@ internal async ValueTask PersistOrSendAsync(params Envelope[] outgoing)

await Transaction.PersistAsync(envelopes);

_outstanding.Fill(outgoing);
lock (_outstandingLock)
{
_outstanding.Fill(outgoing);
}
}
else
{
Expand Down
63 changes: 49 additions & 14 deletions src/Wolverine/Runtime/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,17 @@ public async Task FlushOutgoingMessagesAsync()

await AssertAnyRequiredResponseWasGenerated();

if (_outstanding.Count == 0)
// Snapshot under lock so concurrent publishes from a Marten projection
// (Block parallelism = 10 in AggregationRunner) cannot corrupt the list
// while we're iterating it. GH-2529.
Envelope[] outgoing;
lock (_outstandingLock)
{
return;
if (_outstanding.Count == 0) return;
outgoing = _outstanding.ToArray();
}

foreach (var envelope in Outstanding)
foreach (var envelope in outgoing)
{
// https://github.com/JasperFx/wolverine/issues/2006
if (envelope == null) continue;
Expand Down Expand Up @@ -209,8 +214,11 @@ public async Task FlushOutgoingMessagesAsync()
}

_sent ??= new();
_sent.AddRange(_outstanding);
_outstanding.Clear();
lock (_outstandingLock)
{
_sent.AddRange(_outstanding);
_outstanding.Clear();
}

_hasFlushed = true;

Expand All @@ -236,10 +244,17 @@ public async Task AssertAnyRequiredResponseWasGenerated()
if (isMissingRequestedReply())
{
var failureDescription = $"No response was created for expected response '{Envelope!.ReplyRequested}' back to reply-uri {Envelope.ReplyUri}. ";
if (_outstanding.Count > 0)

Envelope[] outstandingSnapshot;
lock (_outstandingLock)
{
var types = new List<string>(_outstanding.Count + (_sent?.Count ?? 0));
foreach (var e in _outstanding) types.Add(e.MessageType!);
outstandingSnapshot = _outstanding.ToArray();
}

if (outstandingSnapshot.Length > 0)
{
var types = new List<string>(outstandingSnapshot.Length + (_sent?.Count ?? 0));
foreach (var e in outstandingSnapshot) types.Add(e.MessageType!);
if (_sent != null)
{
foreach (var e in _sent) types.Add(e.MessageType!);
Expand Down Expand Up @@ -458,13 +473,19 @@ public async ValueTask SendFailureAcknowledgementAsync(string failureDescription

Task IEnvelopeTransaction.PersistOutgoingAsync(Envelope envelope)
{
_outstanding.Fill(envelope);
lock (_outstandingLock)
{
_outstanding.Fill(envelope);
}
return Task.CompletedTask;
}

Task IEnvelopeTransaction.PersistOutgoingAsync(Envelope[] envelopes)
{
_outstanding.Fill(envelopes);
lock (_outstandingLock)
{
_outstanding.Fill(envelopes);
}
return Task.CompletedTask;
}

Expand Down Expand Up @@ -519,7 +540,12 @@ public override async Task ReScheduleCurrentAsync(DateTimeOffset rescheduledAt)

internal async Task CopyToAsync(IEnvelopeTransaction other)
{
await other.PersistOutgoingAsync(_outstanding.ToArray());
Envelope[] snapshot;
lock (_outstandingLock)
{
snapshot = _outstanding.ToArray();
}
await other.PersistOutgoingAsync(snapshot);

foreach (var envelope in Scheduled) await other.PersistIncomingAsync(envelope);
}
Expand All @@ -530,7 +556,10 @@ internal async Task CopyToAsync(IEnvelopeTransaction other)
public async ValueTask ClearAllAsync()
{
Scheduled.Clear();
_outstanding.Clear();
lock (_outstandingLock)
{
_outstanding.Clear();
}

if (Transaction != null)
{
Expand Down Expand Up @@ -649,7 +678,10 @@ internal void ClearState()
_hasFlushed = false;

_sent?.Clear();
_outstanding.Clear();
lock (_outstandingLock)
{
_outstanding.Clear();
}
Scheduled.Clear();
Envelope = null;
Transaction = null;
Expand Down Expand Up @@ -679,7 +711,10 @@ internal void ReadEnvelope(Envelope? originalEnvelope, IChannelCallback channel)
var ackEnvelope = Runtime.RoutingFor(typeof(Acknowledgement))
.RouteToDestination(ack, Envelope.ReplyUri, null);
TrackEnvelopeCorrelation(ackEnvelope, Activity.Current);
_outstanding.Add(ackEnvelope);
lock (_outstandingLock)
{
_outstanding.Add(ackEnvelope);
}
}
}

Expand Down
Loading