From e1626128df7c91cbce8e0dd869edfe32e0102039 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 19 May 2026 07:25:49 -0500 Subject: [PATCH] Fix #4481: FetchForWriting without appending blocked unrelated inserts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When an inline aggregate's stream was fetched via FetchForWriting and no events were subsequently appended, an unrelated document insert on the same session would fail with JasperFx.ConcurrencyException on SaveChangesAsync — the empty stream still flowed through the inline projection's Apply step and queued a snapshot-update operation against the unchanged aggregate version, which then failed the optimistic-concurrency check. The symptom reproduced for projections that override Evolve directly but not for the conventional Apply/Create method shape. Root cause is upstream in JasperFx.Events 1.x: JasperFxAggregationProjectionBase.AppliesTo(eventTypes) // Have to do this because you don't know if any events catch if (AllEventTypes.Length == 0) return true; return eventTypes.Intersect(AllEventTypes).Any() || ...; For Apply/Create projections AllEventTypes is populated from the discovered handler methods, so an empty stream's empty eventTypes collection cleanly evaluates to false and the stream is screened out. For Evolve-based projections AllEventTypes is empty, the early-out fires `return true`, and the empty stream slips through. Fix lives in Marten's two inline-appender entry points (RichEventAppender, QuickEventAppender): only pass streams that actually have events to the inline-projection ApplyAsync calls. Mirrors the same `x.Events.Any()` filter that RichEventAppender's streamActions branch already uses on the storage side. Regression test added in src/EventSourcingTests/FetchForWriting/ fetch_for_writing_and_projection_metadata_for_inline_projections.cs that fails without the fix (the exact symptom from the issue: ConcurrencyException on the unchanged VersionedGuy) and passes with it. Closes #4481. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...jection_metadata_for_inline_projections.cs | 48 +++++++++++++++++++ src/Marten/Events/QuickEventAppender.cs | 7 ++- src/Marten/Events/RichEventAppender.cs | 19 +++++++- 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs b/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs index 6ed265700c..898c5baea4 100644 --- a/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs +++ b/src/EventSourcingTests/FetchForWriting/fetch_for_writing_and_projection_metadata_for_inline_projections.cs @@ -120,6 +120,48 @@ public async Task can_use_version_metadata_on_existing_stream_with_expected_vers ProjectionWithVersions.VersionsSeen.ShouldBe([5, 6, 7, 8]); } + + // Regression for #4481: when an Evolve-based inline projection's stream is + // fetched via FetchForWriting and no events are appended, an unrelated + // document insert in the same session was failing with + // JasperFx.ConcurrencyException because the empty stream still flowed + // through the inline-projection Apply step and queued a storage operation + // that did an optimistic-concurrency check against the unchanged version. + [Fact] + public async Task fetch_for_writing_without_appending_does_not_block_unrelated_inserts() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + opts.Schema.For().Identity(x => x.Id); + }); + + var streamId = Guid.NewGuid(); + + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, new AEvent(), new BEvent()); + await session.SaveChangesAsync(); + } + + await using (var session = theStore.LightweightSession()) + { + // Fetch the stream but deliberately do not append any new events. + await session.Events.FetchForWriting(streamId); + + // Insert an unrelated document on the same session. + session.Store(new UnrelatedDoc { Id = streamId, Note = "no new events" }); + + // Should NOT throw — the empty stream must not propagate an + // optimistic-concurrency check on the unchanged aggregate. + await session.SaveChangesAsync(); + } + + await using var verify = theStore.QuerySession(); + var unrelated = await verify.LoadAsync(streamId); + unrelated.ShouldNotBeNull(); + unrelated.Note.ShouldBe("no new events"); + } } public class ProjectionWithVersions : SingleStreamProjection @@ -166,3 +208,9 @@ public class VersionedGuy public int Version { get; set; } } + +public class UnrelatedDoc +{ + public Guid Id { get; set; } + public string Note { get; set; } = ""; +} diff --git a/src/Marten/Events/QuickEventAppender.cs b/src/Marten/Events/QuickEventAppender.cs index baabef6d3c..22886ed176 100644 --- a/src/Marten/Events/QuickEventAppender.cs +++ b/src/Marten/Events/QuickEventAppender.cs @@ -85,9 +85,14 @@ public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase { registerOperationsForStreams(eventGraph, session); + // Issue #4481: only pass streams that actually have events. See the + // matching guard in RichEventAppender.ProcessEventsAsync for context. + var streamsWithEvents = session.WorkTracker.Streams.Where(x => x.Events.Any()).ToList(); + if (streamsWithEvents.Count == 0) return; + foreach (var projection in inlineProjections) { - await projection.ApplyAsync(session, session.WorkTracker.Streams.ToList(), token).ConfigureAwait(false); + await projection.ApplyAsync(session, streamsWithEvents, token).ConfigureAwait(false); } } } diff --git a/src/Marten/Events/RichEventAppender.cs b/src/Marten/Events/RichEventAppender.cs index c400449b87..10618c5aae 100644 --- a/src/Marten/Events/RichEventAppender.cs +++ b/src/Marten/Events/RichEventAppender.cs @@ -80,9 +80,24 @@ public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase } // TODO -- look for opportunities to batch up the requests here too! - foreach (var projection in inlineProjections) + // + // Issue #4481: only pass streams that actually have events. An empty + // stream (e.g. FetchForWriting called without any + // subsequent AppendOne/AppendMany) used to slip through here and + // trigger an inline projection's snapshot-write path on the unchanged + // aggregate, raising a JasperFx.ConcurrencyException on the next + // SaveChangesAsync for any other work on the same session. The + // upstream JasperFx aggregation base's `AppliesTo(eventTypes)` + // returns `true` unconditionally when the projection has no + // statically-known event types (Evolve-only projections), so the + // empty stream was not being filtered downstream. + var streamsWithEvents = session.WorkTracker.Streams.Where(x => x.Events.Any()).ToList(); + if (streamsWithEvents.Count > 0) { - await projection.ApplyAsync(session, session.WorkTracker.Streams.ToList(), token).ConfigureAwait(false); + foreach (var projection in inlineProjections) + { + await projection.ApplyAsync(session, streamsWithEvents, token).ConfigureAwait(false); + } } } }