diff --git a/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs b/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs index 6ed265700c..898c5baea4 100644 --- a/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs +++ b/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs @@ -120,6 +120,48 @@ public async Task can_use_version_metadata_on_existing_stream_with_expected_vers ProjectionWithVersions.VersionsSeen.ShouldBe([5, 6, 7, 8]); } + + // Regression for #4481: when an Evolve-based inline projection's stream is + // fetched via FetchForWriting and no events are appended, an unrelated + // document insert in the same session was failing with + // JasperFx.ConcurrencyException because the empty stream still flowed + // through the inline-projection Apply step and queued a storage operation + // that did an optimistic-concurrency check against the unchanged version. + [Fact] + public async Task fetch_for_writing_without_appending_does_not_block_unrelated_inserts() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + opts.Schema.For().Identity(x => x.Id); + }); + + var streamId = Guid.NewGuid(); + + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, new AEvent(), new BEvent()); + await session.SaveChangesAsync(); + } + + await using (var session = theStore.LightweightSession()) + { + // Fetch the stream but deliberately do not append any new events. + await session.Events.FetchForWriting(streamId); + + // Insert an unrelated document on the same session. + session.Store(new UnrelatedDoc { Id = streamId, Note = "no new events" }); + + // Should NOT throw — the empty stream must not propagate an + // optimistic-concurrency check on the unchanged aggregate. + await session.SaveChangesAsync(); + } + + await using var verify = theStore.QuerySession(); + var unrelated = await verify.LoadAsync(streamId); + unrelated.ShouldNotBeNull(); + unrelated.Note.ShouldBe("no new events"); + } } public class ProjectionWithVersions : SingleStreamProjection @@ -166,3 +208,9 @@ public class VersionedGuy public int Version { get; set; } } + +public class UnrelatedDoc +{ + public Guid Id { get; set; } + public string Note { get; set; } = ""; +} diff --git a/src/Marten/Events/QuickEventAppender.cs b/src/Marten/Events/QuickEventAppender.cs index baabef6d3c..22886ed176 100644 --- a/src/Marten/Events/QuickEventAppender.cs +++ b/src/Marten/Events/QuickEventAppender.cs @@ -85,9 +85,14 @@ public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase { registerOperationsForStreams(eventGraph, session); + // Issue #4481: only pass streams that actually have events. See the + // matching guard in RichEventAppender.ProcessEventsAsync for context. + var streamsWithEvents = session.WorkTracker.Streams.Where(x => x.Events.Any()).ToList(); + if (streamsWithEvents.Count == 0) return; + foreach (var projection in inlineProjections) { - await projection.ApplyAsync(session, session.WorkTracker.Streams.ToList(), token).ConfigureAwait(false); + await projection.ApplyAsync(session, streamsWithEvents, token).ConfigureAwait(false); } } } diff --git a/src/Marten/Events/RichEventAppender.cs b/src/Marten/Events/RichEventAppender.cs index c400449b87..10618c5aae 100644 --- a/src/Marten/Events/RichEventAppender.cs +++ b/src/Marten/Events/RichEventAppender.cs @@ -80,9 +80,24 @@ public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase } // TODO -- look for opportunities to batch up the requests here too! - foreach (var projection in inlineProjections) + // + // Issue #4481: only pass streams that actually have events. An empty + // stream (e.g. FetchForWriting called without any + // subsequent AppendOne/AppendMany) used to slip through here and + // trigger an inline projection's snapshot-write path on the unchanged + // aggregate, raising a JasperFx.ConcurrencyException on the next + // SaveChangesAsync for any other work on the same session. The + // upstream JasperFx aggregation base's `AppliesTo(eventTypes)` + // returns `true` unconditionally when the projection has no + // statically-known event types (Evolve-only projections), so the + // empty stream was not being filtered downstream. + var streamsWithEvents = session.WorkTracker.Streams.Where(x => x.Events.Any()).ToList(); + if (streamsWithEvents.Count > 0) { - await projection.ApplyAsync(session, session.WorkTracker.Streams.ToList(), token).ConfigureAwait(false); + foreach (var projection in inlineProjections) + { + await projection.ApplyAsync(session, streamsWithEvents, token).ConfigureAwait(false); + } } } }