diff --git a/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs b/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs index a5ee3862ea..3e1995aea5 100644 --- a/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs +++ b/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs @@ -120,6 +120,48 @@ public async Task can_use_version_metadata_on_existing_stream_with_expected_vers ProjectionWithVersions.VersionsSeen.ShouldBe([5, 6, 7, 8]); } + + // Regression for #4481: when an Evolve-based inline projection's stream is + // fetched via FetchForWriting and no events are appended, an unrelated + // document insert in the same session was failing with + // JasperFx.ConcurrencyException because the empty stream still flowed + // through the inline-projection Apply step and queued a storage operation + // that did an optimistic-concurrency check against the unchanged version. + [Fact] + public async Task fetch_for_writing_without_appending_does_not_block_unrelated_inserts() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + opts.Schema.For().Identity(x => x.Id); + }); + + var streamId = Guid.NewGuid(); + + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, new AEvent(), new BEvent()); + await session.SaveChangesAsync(); + } + + await using (var session = theStore.LightweightSession()) + { + // Fetch the stream but deliberately do not append any new events. + await session.Events.FetchForWriting(streamId); + + // Insert an unrelated document on the same session. + session.Store(new UnrelatedDoc { Id = streamId, Note = "no new events" }); + + // Should NOT throw — the empty stream must not propagate an + // optimistic-concurrency check on the unchanged aggregate. + await session.SaveChangesAsync(); + } + + await using var verify = theStore.QuerySession(); + var unrelated = await verify.LoadAsync(streamId); + unrelated.ShouldNotBeNull(); + unrelated.Note.ShouldBe("no new events"); + } } public partial class ProjectionWithVersions : SingleStreamProjection @@ -166,3 +208,9 @@ public class VersionedGuy public long Version { get; set; } } + +public class UnrelatedDoc +{ + public Guid Id { get; set; } + public string Note { get; set; } = ""; +} diff --git a/src/Marten/Events/QuickEventAppender.cs b/src/Marten/Events/QuickEventAppender.cs index 1d89d5efec..a85eae9fff 100644 --- a/src/Marten/Events/QuickEventAppender.cs +++ b/src/Marten/Events/QuickEventAppender.cs @@ -108,9 +108,11 @@ public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase // 9.0 (#4306): pass the tracker collection directly now that the // IInlineProjection contract takes IEnumerable. + // Issue #4481: filter to streams with events. See the matching + // guard in RichEventAppender.ProcessEventsAsync for context. foreach (var projection in inlineProjections) { - await projection.ApplyAsync(session, session.WorkTracker.Streams, token).ConfigureAwait(false); + await projection.ApplyAsync(session, session.WorkTracker.Streams.Where(x => x.Events.Any()), token).ConfigureAwait(false); } } } diff --git a/src/Marten/Events/RichEventAppender.cs b/src/Marten/Events/RichEventAppender.cs index 0e9b6944b2..9784858caa 100644 --- a/src/Marten/Events/RichEventAppender.cs +++ b/src/Marten/Events/RichEventAppender.cs @@ -87,9 +87,23 @@ public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase // 9.0 (#4306): IInlineProjection.ApplyAsync now takes IEnumerable, // so we can pass the session's tracker collection directly without // allocating a fresh List on every SaveChangesAsync. + // + // Issue #4481: only pass streams that actually have events. An empty + // stream (e.g. FetchForWriting called without any + // subsequent AppendOne/AppendMany) used to slip through here and + // trigger an inline projection's snapshot-write path on the unchanged + // aggregate, raising a JasperFx.ConcurrencyException on the next + // SaveChangesAsync for any other work on the same session. The + // upstream JasperFx aggregation base's `AppliesTo(eventTypes)` + // returns `true` unconditionally when the projection has no + // statically-known event types (Evolve-only projections), so the + // empty stream was not being filtered downstream. The lazy `Where` + // preserves the no-allocation property that #4306 introduced — the + // filter is one tiny iterator object per projection call instead of + // a fresh List on every save. foreach (var projection in inlineProjections) { - await projection.ApplyAsync(session, session.WorkTracker.Streams, token).ConfigureAwait(false); + await projection.ApplyAsync(session, session.WorkTracker.Streams.Where(x => x.Events.Any()), token).ConfigureAwait(false); } } }