From e7b775fc6f2bb39c9aaf1198322c040488ba1959 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 19 May 2026 07:25:49 -0500 Subject: [PATCH] Fix #4481: FetchForWriting without appending blocked unrelated inserts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When an inline aggregate's stream was fetched via FetchForWriting and no events were subsequently appended, an unrelated document insert on the same session would fail with JasperFx.ConcurrencyException on SaveChangesAsync — the empty stream still flowed through the inline projection's Apply step and queued a snapshot-update operation against the unchanged aggregate version, which then failed the optimistic-concurrency check. The symptom reproduced for projections that override Evolve directly but not for the conventional Apply/Create method shape. Root cause is upstream in JasperFx.Events 1.x: JasperFxAggregationProjectionBase.AppliesTo(eventTypes) // Have to do this because you don't know if any events catch if (AllEventTypes.Length == 0) return true; return eventTypes.Intersect(AllEventTypes).Any() || ...; For Apply/Create projections AllEventTypes is populated from the discovered handler methods, so an empty stream's empty eventTypes collection cleanly evaluates to false and the stream is screened out. For Evolve-based projections AllEventTypes is empty, the early-out fires `return true`, and the empty stream slips through. Fix lives in Marten's two inline-appender entry points (RichEventAppender, QuickEventAppender): only pass streams that actually have events to the inline-projection ApplyAsync calls. Mirrors the same `x.Events.Any()` filter that RichEventAppender's streamActions branch already uses on the storage side. Regression test added in src/EventSourcingTests/FetchForWriting/ fetch_for_writing_and_projection_metadata_for_inline_projections.cs that fails without the fix (the exact symptom from the issue: ConcurrencyException on the unchanged VersionedGuy) and passes with it. Closes #4481. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...jection_metadata_for_inline_projections.cs | 48 +++++++++++++++++++ src/Marten/Events/QuickEventAppender.cs | 4 +- src/Marten/Events/RichEventAppender.cs | 16 ++++++- 3 files changed, 66 insertions(+), 2 deletions(-) diff --git a/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs b/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs index a5ee3862ea..3e1995aea5 100644 --- a/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs +++ b/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs @@ -120,6 +120,48 @@ public async Task can_use_version_metadata_on_existing_stream_with_expected_vers ProjectionWithVersions.VersionsSeen.ShouldBe([5, 6, 7, 8]); } + + // Regression for #4481: when an Evolve-based inline projection's stream is + // fetched via FetchForWriting and no events are appended, an unrelated + // document insert in the same session was failing with + // JasperFx.ConcurrencyException because the empty stream still flowed + // through the inline-projection Apply step and queued a storage operation + // that did an optimistic-concurrency check against the unchanged version. + [Fact] + public async Task fetch_for_writing_without_appending_does_not_block_unrelated_inserts() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + opts.Schema.For().Identity(x => x.Id); + }); + + var streamId = Guid.NewGuid(); + + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, new AEvent(), new BEvent()); + await session.SaveChangesAsync(); + } + + await using (var session = theStore.LightweightSession()) + { + // Fetch the stream but deliberately do not append any new events. + await session.Events.FetchForWriting(streamId); + + // Insert an unrelated document on the same session. + session.Store(new UnrelatedDoc { Id = streamId, Note = "no new events" }); + + // Should NOT throw — the empty stream must not propagate an + // optimistic-concurrency check on the unchanged aggregate. + await session.SaveChangesAsync(); + } + + await using var verify = theStore.QuerySession(); + var unrelated = await verify.LoadAsync(streamId); + unrelated.ShouldNotBeNull(); + unrelated.Note.ShouldBe("no new events"); + } } public partial class ProjectionWithVersions : SingleStreamProjection @@ -166,3 +208,9 @@ public class VersionedGuy public long Version { get; set; } } + +public class UnrelatedDoc +{ + public Guid Id { get; set; } + public string Note { get; set; } = ""; +} diff --git a/src/Marten/Events/QuickEventAppender.cs b/src/Marten/Events/QuickEventAppender.cs index 1d89d5efec..a85eae9fff 100644 --- a/src/Marten/Events/QuickEventAppender.cs +++ b/src/Marten/Events/QuickEventAppender.cs @@ -108,9 +108,11 @@ public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase // 9.0 (#4306): pass the tracker collection directly now that the // IInlineProjection contract takes IEnumerable. + // Issue #4481: filter to streams with events. See the matching + // guard in RichEventAppender.ProcessEventsAsync for context. foreach (var projection in inlineProjections) { - await projection.ApplyAsync(session, session.WorkTracker.Streams, token).ConfigureAwait(false); + await projection.ApplyAsync(session, session.WorkTracker.Streams.Where(x => x.Events.Any()), token).ConfigureAwait(false); } } } diff --git a/src/Marten/Events/RichEventAppender.cs b/src/Marten/Events/RichEventAppender.cs index 0e9b6944b2..9784858caa 100644 --- a/src/Marten/Events/RichEventAppender.cs +++ b/src/Marten/Events/RichEventAppender.cs @@ -87,9 +87,23 @@ public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase // 9.0 (#4306): IInlineProjection.ApplyAsync now takes IEnumerable, // so we can pass the session's tracker collection directly without // allocating a fresh List on every SaveChangesAsync. + // + // Issue #4481: only pass streams that actually have events. An empty + // stream (e.g. FetchForWriting called without any + // subsequent AppendOne/AppendMany) used to slip through here and + // trigger an inline projection's snapshot-write path on the unchanged + // aggregate, raising a JasperFx.ConcurrencyException on the next + // SaveChangesAsync for any other work on the same session. The + // upstream JasperFx aggregation base's `AppliesTo(eventTypes)` + // returns `true` unconditionally when the projection has no + // statically-known event types (Evolve-only projections), so the + // empty stream was not being filtered downstream. The lazy `Where` + // preserves the no-allocation property that #4306 introduced — the + // filter is one tiny iterator object per projection call instead of + // a fresh List on every save. foreach (var projection in inlineProjections) { - await projection.ApplyAsync(session, session.WorkTracker.Streams, token).ConfigureAwait(false); + await projection.ApplyAsync(session, session.WorkTracker.Streams.Where(x => x.Events.Any()), token).ConfigureAwait(false); } } }