diff --git a/src/Persistence/MartenTests/AncillaryStores/multi_stream_projection_with_side_effects_on_ancillary_store.cs b/src/Persistence/MartenTests/AncillaryStores/multi_stream_projection_with_side_effects_on_ancillary_store.cs new file mode 100644 index 000000000..ba834532e --- /dev/null +++ b/src/Persistence/MartenTests/AncillaryStores/multi_stream_projection_with_side_effects_on_ancillary_store.cs @@ -0,0 +1,200 @@ +using System.Diagnostics; +using IntegrationTests; +using JasperFx; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using JasperFx.Resources; +using Marten; +using Marten.Events; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.AncillaryStores; + +/// +/// GH-2529: Investigation test for the silent-hang concern when combining +/// (a) a Marten ancillary document store with IntegrateWithWolverine, +/// (b) the async daemon running on that ancillary store, +/// (c) a multi-stream projection that publishes Wolverine messages via +/// RaiseSideEffects(). +/// +/// If this test passes consistently, the suspected silent failure is not +/// reproducible at the framework integration level. If it hangs or fails, +/// we have a precise reproducer to drive the fix. +/// +public class multi_stream_projection_with_side_effects_on_ancillary_store : IAsyncLifetime +{ + private IHost _host = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Primary Marten store + Wolverine integration + opts.Services.AddMarten(m => + { + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "issue2529_main"; + m.Events.DatabaseSchemaName = "issue2529_main"; + m.DisableNpgsqlLogging = true; + }).IntegrateWithWolverine(); + + opts.Durability.Mode = DurabilityMode.Solo; + opts.Policies.AutoApplyTransactions(); + + // Ancillary store with async daemon + multi-stream projection + opts.Services.AddMartenStore(sp => + { + var storeOptions = new StoreOptions(); + storeOptions.Connection(Servers.PostgresConnectionString); + storeOptions.DatabaseSchemaName = "issue2529_ancillary"; + storeOptions.Events.DatabaseSchemaName = "issue2529_ancillary"; + storeOptions.DisableNpgsqlLogging = true; + + // The projection that calls RaiseSideEffects → publishes a Wolverine message + storeOptions.Projections.Add(ProjectionLifecycle.Async); + return storeOptions; + }) + .IntegrateWithWolverine() + .AddAsyncDaemon(DaemonMode.Solo); + + opts.Services.AddResourceSetupOnStartup(StartupAction.ResetState); + + opts.Discovery.DisableConventionalDiscovery() + .IncludeType(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + private async Task AppendAndWaitForProjectionAsync(params Guid[] streamIds) + { + using var scope = _host.Services.CreateScope(); + var store = scope.ServiceProvider.GetRequiredService(); + + await using (var session = store.LightweightSession()) + { + foreach (var id in streamIds) + { + session.Events.StartStream(id, new IncrementCounter()); + } + await session.SaveChangesAsync(); + } + + // Make sure the async daemon has caught up to all the events we just wrote. + // This rules out "the daemon is just slow" as the cause of side-effect non-delivery. + using var daemon = await store.BuildProjectionDaemonAsync(); + await daemon.WaitForNonStaleData(60.Seconds()); + } + + [Fact] + public async Task projection_side_effect_message_reaches_wolverine_handler() + { + var streamId = Guid.NewGuid(); + + Issue2529SideEffectHandler.SeenStreamIds.Clear(); + + var tracked = await _host + .TrackActivity() + .Timeout(60.Seconds()) + .IncludeExternalTransports() + .WaitForMessageToBeReceivedAt(_host) + .ExecuteAndWaitAsync((Func)(_ => AppendAndWaitForProjectionAsync(streamId))); + + tracked.Executed.MessagesOf() + .Where(m => m.StreamId == streamId) + .ShouldHaveSingleItem(); + } + + // GH-2529: Was failing because concurrent slice processing in Marten's + // AggregationRunner corrupted MessageContext._outstanding (concurrent + // List.Add is not thread-safe), dropping most side-effect messages + // with no exception. Fixed by adding a lock around _outstanding mutations + // in MessageBus / MessageContext. + [Fact] + public async Task multiple_side_effects_in_one_batch_all_reach_wolverine() + { + var streamIds = new[] { Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid() }; + + Issue2529SideEffectHandler.SeenStreamIds.Clear(); + + var tracked = await _host + .TrackActivity() + .Timeout(60.Seconds()) + .IncludeExternalTransports() + .ExecuteAndWaitAsync((Func)(_ => AppendAndWaitForProjectionAsync(streamIds))); + + var ourStreamIds = streamIds.ToHashSet(); + var seen = tracked.Executed.MessagesOf() + .Where(m => ourStreamIds.Contains(m.StreamId)) + .Select(m => m.StreamId) + .OrderBy(x => x) + .ToArray(); + + seen.ShouldBe(streamIds.OrderBy(x => x).ToArray()); + } +} + +// ── Marker interface for the ancillary store ── +public interface IIssue2529Store : IDocumentStore; + +// ── Domain ── +public record IncrementCounter; + +public class Issue2529Counter +{ + public Guid Id { get; set; } + public int Count { get; set; } +} + +// ── Multi-stream projection that publishes side effects ── +public class Issue2529CounterProjection : MultiStreamProjection +{ + public Issue2529CounterProjection() + { + Identity(x => x.StreamId); + } + + public static Issue2529Counter Create(IncrementCounter @event, IEvent metadata) => + new() { Id = metadata.StreamId, Count = 1 }; + + public void Apply(Issue2529Counter counter, IncrementCounter _) => counter.Count++; + + public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice slice) + { + if (slice.Aggregate is not null) + { + slice.PublishMessage(new CounterIncremented(slice.Aggregate.Id, slice.Aggregate.Count)); + } + return ValueTask.CompletedTask; + } +} + +public record CounterIncremented(Guid StreamId, int Count); + +// ── Wolverine handler — records what it saw so the test can assert ── +public class Issue2529SideEffectHandler +{ + public static Guid LastSeenStreamId = Guid.Empty; + public static readonly List SeenStreamIds = new(); + + public static void Handle(CounterIncremented msg) + { + LastSeenStreamId = msg.StreamId; + lock (SeenStreamIds) SeenStreamIds.Add(msg.StreamId); + Debug.WriteLine($"Side effect received for stream {msg.StreamId} (count={msg.Count})"); + } +} diff --git a/src/Wolverine/Runtime/MessageBus.cs b/src/Wolverine/Runtime/MessageBus.cs index 13db506f5..dba4a3b8b 100644 --- a/src/Wolverine/Runtime/MessageBus.cs +++ b/src/Wolverine/Runtime/MessageBus.cs @@ -14,6 +14,14 @@ public static MessageBus Build(IWolverineRuntime runtime, string correlationId) // ReSharper disable once InconsistentNaming protected readonly List _outstanding = new(); + + // Protects _outstanding from concurrent mutation. The Marten async daemon's + // multi-stream projection runner can call PublishAsync on the same MessageContext + // from many slices in parallel (Block parallelism = 10). Without this lock, + // concurrent List.Add silently corrupts the list and drops messages. + // GH-2529. + // ReSharper disable once InconsistentNaming + protected readonly object _outstandingLock = new(); private string? _tenantId; public MessageBus(IWolverineRuntime runtime) : this(runtime, Activity.Current?.RootId ?? Guid.NewGuid().ToString()) @@ -57,7 +65,17 @@ public virtual Task ReScheduleCurrentAsync(DateTimeOffset rescheduledAt) public IWolverineRuntime Runtime { get; } public IMessageStore Storage { get; internal set; } - public IEnumerable Outstanding => _outstanding; + /// + /// Snapshot of envelopes published in this context that have not yet been flushed. + /// Returns a copy to avoid concurrent-enumeration issues — see . + /// + public IEnumerable Outstanding + { + get + { + lock (_outstandingLock) return _outstanding.ToArray(); + } + } public IEnvelopeTransaction? Transaction { get; protected set; } public Guid ConversationId { get; protected set; } @@ -267,7 +285,10 @@ internal async ValueTask PersistOrSendAsync(Envelope envelope) if (Transaction is not null) { - _outstanding.Fill(envelope); + lock (_outstandingLock) + { + _outstanding.Fill(envelope); + } await envelope.PersistAsync(Transaction); @@ -366,7 +387,10 @@ internal async ValueTask PersistOrSendAsync(params Envelope[] outgoing) await Transaction.PersistAsync(envelopes); - _outstanding.Fill(outgoing); + lock (_outstandingLock) + { + _outstanding.Fill(outgoing); + } } else { diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index 2bf20ad0a..25f29f85b 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -151,12 +151,17 @@ public async Task FlushOutgoingMessagesAsync() await AssertAnyRequiredResponseWasGenerated(); - if (_outstanding.Count == 0) + // Snapshot under lock so concurrent publishes from a Marten projection + // (Block parallelism = 10 in AggregationRunner) cannot corrupt the list + // while we're iterating it. GH-2529. + Envelope[] outgoing; + lock (_outstandingLock) { - return; + if (_outstanding.Count == 0) return; + outgoing = _outstanding.ToArray(); } - foreach (var envelope in Outstanding) + foreach (var envelope in outgoing) { // https://github.com/JasperFx/wolverine/issues/2006 if (envelope == null) continue; @@ -209,8 +214,11 @@ public async Task FlushOutgoingMessagesAsync() } _sent ??= new(); - _sent.AddRange(_outstanding); - _outstanding.Clear(); + lock (_outstandingLock) + { + _sent.AddRange(_outstanding); + _outstanding.Clear(); + } _hasFlushed = true; @@ -236,10 +244,17 @@ public async Task AssertAnyRequiredResponseWasGenerated() if (isMissingRequestedReply()) { var failureDescription = $"No response was created for expected response '{Envelope!.ReplyRequested}' back to reply-uri {Envelope.ReplyUri}. "; - if (_outstanding.Count > 0) + + Envelope[] outstandingSnapshot; + lock (_outstandingLock) { - var types = new List(_outstanding.Count + (_sent?.Count ?? 0)); - foreach (var e in _outstanding) types.Add(e.MessageType!); + outstandingSnapshot = _outstanding.ToArray(); + } + + if (outstandingSnapshot.Length > 0) + { + var types = new List(outstandingSnapshot.Length + (_sent?.Count ?? 0)); + foreach (var e in outstandingSnapshot) types.Add(e.MessageType!); if (_sent != null) { foreach (var e in _sent) types.Add(e.MessageType!); @@ -458,13 +473,19 @@ public async ValueTask SendFailureAcknowledgementAsync(string failureDescription Task IEnvelopeTransaction.PersistOutgoingAsync(Envelope envelope) { - _outstanding.Fill(envelope); + lock (_outstandingLock) + { + _outstanding.Fill(envelope); + } return Task.CompletedTask; } Task IEnvelopeTransaction.PersistOutgoingAsync(Envelope[] envelopes) { - _outstanding.Fill(envelopes); + lock (_outstandingLock) + { + _outstanding.Fill(envelopes); + } return Task.CompletedTask; } @@ -519,7 +540,12 @@ public override async Task ReScheduleCurrentAsync(DateTimeOffset rescheduledAt) internal async Task CopyToAsync(IEnvelopeTransaction other) { - await other.PersistOutgoingAsync(_outstanding.ToArray()); + Envelope[] snapshot; + lock (_outstandingLock) + { + snapshot = _outstanding.ToArray(); + } + await other.PersistOutgoingAsync(snapshot); foreach (var envelope in Scheduled) await other.PersistIncomingAsync(envelope); } @@ -530,7 +556,10 @@ internal async Task CopyToAsync(IEnvelopeTransaction other) public async ValueTask ClearAllAsync() { Scheduled.Clear(); - _outstanding.Clear(); + lock (_outstandingLock) + { + _outstanding.Clear(); + } if (Transaction != null) { @@ -649,7 +678,10 @@ internal void ClearState() _hasFlushed = false; _sent?.Clear(); - _outstanding.Clear(); + lock (_outstandingLock) + { + _outstanding.Clear(); + } Scheduled.Clear(); Envelope = null; Transaction = null; @@ -679,7 +711,10 @@ internal void ReadEnvelope(Envelope? originalEnvelope, IChannelCallback channel) var ackEnvelope = Runtime.RoutingFor(typeof(Acknowledgement)) .RouteToDestination(ack, Envelope.ReplyUri, null); TrackEnvelopeCorrelation(ackEnvelope, Activity.Current); - _outstanding.Add(ackEnvelope); + lock (_outstandingLock) + { + _outstanding.Add(ackEnvelope); + } } }