Fix #4657: bypass session VersionTracker from projection StoreProjection (async daemon race)#4658
Merged
Merged
Conversation
The async daemon's AggregationRunner.BuildBatchAsync runs slice handlers through `new Block<EventSliceExecution>(10, ...)` -- ten parallel workers all calling ApplyChangesAsync against the same IProjectionBatch (and therefore the same shared IMartenSession). Each call lands in StoreProjection -> _storage.Overwrite(aggregate, _session, TenantId) -> RevisionsFor(session) -> session.Versions.RevisionsFor<TDoc, TId>(), which mutates the VersionTracker's plain Dictionary<Type, object> _byType. The races throw "Operations that change non-concurrent collections must have exclusive access. A concurrent update was performed on this collection and corrupted its state. The collection's state is no longer correct." DocumentSessionBase / IMartenSession are by-contract not thread-safe, so the fix isn't to synchronize VersionTracker (the same race would surface next on UnitOfWork._eventOperations, the identity map, etc.). Projections don't actually need session-level version tracking on this path -- the revision is set explicitly from the event right after the op is built and IgnoreConcurrencyViolation is forced true, so the tracker bookkeeping is dead weight. The fix is to give projections a lighter-weight overwrite that never touches the session. - Add IStorageOperation OverwriteProjected(T, string) to IDocumentStorage<T>. - Override on each closed-shape storage variant (Lightweight, IdentityMap, DirtyChecked) to construct ClosedShapeOverwriteOperation directly with null for both version / revision tracker dicts (ClosedShapeOverwriteOperation already tolerates this via its existing Dictionary?<,> fields). - QueryOnly closed-shape throws NotSupported, matching its Overwrite shape. - SubClassDocumentStorage + ValueTypeIdentifiedDocumentStorage delegate to the inner storage's OverwriteProjected. - EventDocumentStorage + EventMapping<T> throw NotSupported (events aren't overwritten via the projection storage path). - Switch ProjectionStorage<TDoc, TId>.StoreProjection to call OverwriteProjected(aggregate, TenantId) -- no session arg, no race possible by construction. Full solution builds clean. Integration tests are run by CI (local Marten test Postgres on 5432 not running at the moment). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…eration Postprocess The OverwriteProjected path correctly constructs the operation with null version / revision dictionaries (that's the whole point — never reach for session-shared tracker state from parallel daemon workers). But the Postprocess `ApplyConcurrencyResult` still null-bangs into those dicts on the RETURNING row, which threw NullReferenceException on any async-daemon projection over a document with ConcurrencyMode.Optimistic or .Numeric. Surfaced as CI failures on PR #4658: archiving_events.capture_archived_event_with_async_projection_will_archive_the_stream, UpcastersTests.HavingEvents_WithSchemaChange_AggregationShouldWork, and the async-tracking + projection cascade that depended on those projections producing output. Fix: gate the tracker writes on `is not null`. Still consume the reader row so the OperationPage cursor stays aligned, and still apply the revision back onto the document (harmless and consistent with the non-projection path). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
jeremydmiller
added a commit
that referenced
this pull request
Jun 5, 2026
…ions / storages / selectors
Eliminates every per-operation and per-row read of
_descriptor.ConcurrencyMode in src/Marten/Internal/ClosedShape/ by splitting
each writeable closed-shape class into three sealed leaves keyed on the
mode:
Operations (each kind → 1 abstract base + 3 sealed leaves):
ClosedShape{Insert,Update,Upsert,Overwrite}Operation
→ Unversioned/Optimistic/Numeric ClosedShape{X}Operation
12 sealed leaves; bind exactly the slots their mode needs, no runtime
switch on ConcurrencyMode. Numeric variants own the UseVersionFromMatchingStream
subquery layout. Optimistic leaves carry typed Dictionary<TId, Guid>
trackers; Numeric leaves carry typed Dictionary<TId, long> trackers;
Unversioned leaves carry no tracker at all. Optimistic/Numeric Overwrite
keep the tracker nullable to preserve the #4658 OverwriteProjected
contract (projection path passes null).
Storages (each writeable kind → 1 abstract base + 3 sealed leaves):
{Lightweight,IdentityMap,DirtyChecked}ClosedShapeStorage
→ Unversioned/Optimistic/Numeric {X}ClosedShapeStorage
9 sealed leaves. Each leaf's Insert/Update/Upsert/Overwrite/
OverwriteProjected/BuildSelector factories construct the matching
concurrency-mode operation + selector directly. The shared
VersionsFor/RevisionsFor helpers are gone — each leaf knows its mode.
QueryOnlyClosedShapeStorage is unchanged (no write path).
Selectors (each tracking kind → 1 abstract base + 3 sealed leaves):
ClosedShape{Lightweight,IdentityMap,DirtyTracking}Selector
→ Unversioned/Optimistic/Numeric ClosedShape{X}Selector
9 sealed leaves. Each leaf overrides CaptureVersion with the
monomorphic per-row implementation (no-op / Guid capture / long
capture). The per-row hot path now contains zero ConcurrencyMode reads.
ClosedShapeQueryOnlySelector unchanged (no concurrency interaction).
Registration:
ClosedShapeRegistration.BuildProvider gains three small dispatch
helpers (BuildLightweightStorage / BuildIdentityMapStorage /
BuildDirtyCheckedStorage) that switch on descriptor.ConcurrencyMode
ONCE at registration time and construct the right concurrency-specific
storage leaf. The closed-shape DocumentProvider keeps its 4-tuple
shape; only the writeable members change leaf identity.
Public API impact:
IDocumentStorage<T, TId> and DocumentStorage<T, TId> are unchanged.
The three writeable storage classes (LightweightClosedShapeStorage,
IdentityMapClosedShapeStorage, DirtyCheckedClosedShapeStorage) flip
from `public sealed` → `public abstract` — they still exist as public
types but are no longer directly instantiable. Existing
ClosedShapeRegistration callers + the W3 spike's UseLightweightSequentialGuidClosedShape /
UseExternallyAssignedStringClosedShape extensions continue to work
because the registration internals build the leaves. All new leaves
are `internal sealed`.
Numeric leaves use `await reader.GetFieldValueAsync<long>(0, token)`
instead of the synchronous read the original used inside the private
ApplyConcurrencyResult helper — the VSTHRD103 analyzer flags it once
the read is hoisted into PostprocessAsync directly. Behaviorally equivalent.
DescriptorBuilder and DescriptorTorageDescriptor itself are unchanged —
ConcurrencyMode is now factory input only, not a runtime read.
Acceptance criteria (issue #4659 Phase 1):
- `grep -rn "_descriptor\\.ConcurrencyMode" src/Marten/Internal/ClosedShape/`
returns 0 hits (modulo ClosedShapeRegistration's 3 dispatch sites + 1
comment).
- Every new variant is `internal sealed`.
- No new public API surface; the three writeable storage class names
are preserved (now abstract bases).
- DocumentDbTests: 999/1000 passed locally on net9 (one unrelated
pre-existing skip).
- PatchingTests: 122/123 passed locally on net9 (one unrelated
pre-existing skip).
Phase 2 (hierarchy selector split — `HierarchyMapping is { }` per-row
branch in the four read selectors) is the planned follow-up PR per the
issue's phased plan.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This was referenced Jun 5, 2026
jeremydmiller
added a commit
that referenced
this pull request
Jun 5, 2026
…trackers Async-daemon parallel slice workers share a single DocumentSessionBase; the projection write path was still reaching into _session.Versions / _session.Revisions / _session.ItemMap from inside the parallel block. #4658 closed StoreProjection; this phase closes the remaining write sites that ProjectionStorage's Store(snapshot) and Store(snapshot, id, tenantId) routed through. Surface change: IDocumentStorage<T>.UpsertProjected(T, string tenantId) IDocumentStorage<T>.InsertProjected(T, string tenantId) IDocumentStorage<T>.UpdateProjected(T, string tenantId) These mirror the existing OverwriteProjected pattern. The 9 writeable closed-shape storage leaves build their corresponding closed-shape op with null trackers; QueryOnly throws NotSupported; SubClass / ValueType / Event storages delegate or throw consistently with their existing OverwriteProjected implementations. Null-tolerance audit applied to all four ConcurrencyMode op classes that hold a tracker dictionary (Optimistic Insert/Update/Upsert + Numeric Insert/Update/ Upsert): _versions / _revisions are now nullable and the dict writes in PostprocessAsync are guarded. Optimistic Update/Upsert's ConfigureCommand treats a null tracker as "expected version unknown" and binds DBNull for the WHERE-guard slot. ProjectionStorage rewrites: * Store(snapshot) -> _storage.UpsertProjected(snapshot, TenantId) * Store(snapshot, id, tenantId) -> _storage.UpsertProjected(snapshot, tenantId) The GH-3850 identity-map maintenance (EjectAggregateFromIdentityMap + _storage.Store(_session, snapshot)) is preserved but gated on Options.EventGraph.UseIdentityMapForAggregates so the default (false) path takes the session-state-free write and the opt-in (true) path still gets correct inline-projection-rewriting-an-immutable-aggregate semantics. Per the #4667 Phase 3 design note, opt-in is documented as not safe for parallel projection workers — that race is accepted with the flag on. Verified locally: * DaemonTests net9.0 — 187 / 187 green * EventSourcingTests net9.0 — 1361 / 1368 green (7 pre-existing skips, including the GH-3850 regression which still passes) * CoreTests net9.0 — 421 / 422 green (1 pre-existing skip) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
jeremydmiller
added a commit
that referenced
this pull request
Jun 5, 2026
…trackers Closes the remaining read sites that ProjectionStorage's LoadAsync / LoadManyAsync still routed through the session-aware closed-shape selectors, which per-row write into _session.Versions / _session.ItemMap / _session.ChangeTrackers (the same race shape #4658 + Phase 1 closed for the write path). Surface change: IDocumentStorage<T, TId>.LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token) IDocumentStorage<T, TId>.LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token) These take IMartenDatabase + tenantId, not a session — the issue's stated goal that storage read methods never accept an IMartenSession argument. Implementation: a new ClosedShapeProjectionLoader<TDoc, TId> static helper opens a fresh connection from the database, executes the existing BuildLoad{,Many}Command SQL, and deserializes the data column directly via ISerializer.FromJsonAsync. Hierarchical storages dispatch through DocumentMapping.TypeFor like HierarchicalClosedShapeQueryOnlySelector. No metadata binders are applied — projections care about aggregate state, not per-row CreatedAt / Headers / etc.; if a future projection scenario needs metadata it can be added here as a focused follow-up. Column layout matches the writeable closed-shape selectors (Lightweight / IdentityMap / DirtyChecked): id at col 0, data at col 1, metadata at 2+. This was the v1 bug — I'd started with the QueryOnly layout (data at col 0) and the first DaemonTests run failed 19 tests with JsonReaderException because the loader was deserializing the id column bytes as JSON. Storage wiring: * LightweightClosedShapeStorage / IdentityMapClosedShapeStorage / DirtyCheckedClosedShapeStorage (the 3 writeable closed-shape mid-tiers) override with the helper. * QueryOnlyClosedShapeStorage throws NotSupported — it isn't used by the projection read path; ProjectionStorage holds a writeable storage for the projected document type. * SubClassDocumentStorage delegates to parent + downcast like its other Load delegations. * ValueTypeIdentifiedDocumentStorage delegates to inner with the unwrapped id. * DocumentStorage<T, TId> base default impl throws NotImplementedException for non-closed-shape paths (legacy Roslyn-generated storages) — those aren't projection-eligible by construction. * IDocumentStorage<T> (single-T) doesn't get the new methods — they're on the <T, TId> interface only, so EventDocumentStorage / EventMapping<T> don't need to implement them (events aren't projected documents). ProjectionStorage rewrites: * LoadAsync(id, ct) — gated on UseIdentityMapForAggregates. Default (false) routes through LoadProjectedAsync. The opt-in (true) case falls through to the session-aware LoadAsync to preserve the inline-projection identity-map semantics that fetching_inline_aggregates_for_writing. silently_turns_on_identity_map_for_inline_aggregates depends on — the inline projection needs to mutate the same instance the IdentitySession holds. Matches the same gating Phase 1 applied to Store(snapshot, id, ...) for GH-3850. * LoadManyAsync(identities, ct) — same gating. Verified locally on net9.0: * DaemonTests — 187 / 187 ✅ * EventSourcingTests — 1361 passed / 7 pre-existing skips ✅ (including the GH-3850 regression and the silently_turns_on_identity_map_for_inline_aggregates regression that caught the v1 gating gap) * CoreTests — 421 passed / 1 pre-existing skip ✅ Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Fixes #4657.
Problem
A user running an async-daemon
MultiStreamProjection(same race exists forSingleStreamProjection) hit:```
System.InvalidOperationException: Operations that change non-concurrent collections must have exclusive access. ...
at System.Collections.Generic.Dictionary`2.TryInsert(...)
at Marten.Internal.VersionTracker.RevisionsForTDoc,TId ← Dictionary<Type, object> _byType, no sync
at Marten.Internal.ClosedShape.LightweightClosedShapeStorage`2.Overwrite(...)
at Marten.Internal.Sessions.ProjectionStorage`2.StoreProjection(...)
at JasperFx.Events.Daemon.AggregationRunner`4.ApplyChangesAsync(...)
```
AggregationRunner.BuildBatchAsync(in JasperFx.Events) spins upnew Block<EventSliceExecution>(10, ...)— ten parallel workers all callingApplyChangesAsyncagainst the sameIProjectionBatch/IMartenSession. Each call goes throughStoreProjection→_storage.Overwrite(aggregate, _session, TenantId)→RevisionsFor(session)→ mutating insert onVersionTracker._byType. The first parallel calls race on that insert, andDictionarydetects the concurrent modification.DocumentSessionBase/IMartenSessionare by-contract not thread-safe, so synchronizingVersionTrackerwould just be papering over the wrong layer — the same race would surface next onUnitOfWork._eventOperations, the identity map, etc.Fix — give projections a session-free overwrite
StoreProjection()doesn't actually need session-level version tracking on this path: the revision is set explicitly from the event right after the op is built, andIgnoreConcurrencyViolation = truemakes any tracker bookkeeping moot. So the right move is to not callIDocumentStorage.Overwrite(...)from projections.IStorageOperation OverwriteProjected(T document, string tenantId)toIDocumentStorage<T, TId>and as an abstract method onDocumentStorage<T, TId>.ClosedShapeOverwriteOperationdirectly withnullfor both tracker dicts (the op already tolerates this — that's howConcurrencyMode.Offworks today via the existingVersionsFor/RevisionsForhelpers).QueryOnlyClosedShapeStoragethrowsNotSupported, matching itsOverwrite.SubClassDocumentStorage+ValueTypeIdentifiedDocumentStoragedelegate to the inner storage.EventDocumentStorage+EventMapping<T>throwNotSupported(events aren't overwritten via the projection storage path).ProjectionStorage<TDoc, TId>.StoreProjectionto callOverwriteProjected(aggregate, TenantId)— no session arg.After this, the projection write path never touches
_session.Versions, so 10-way parallelApplyChangesAsyncis safe on the projection path by construction (no shared mutable session state read or written along the way).Scope note
This resolves the reported crash. The broader "
DocumentSessionBaseisn't thread-safe but the daemon shares one across 10 parallel workers" question stays open —_session.QueueOperation(op)still mutatesUnitOfWork._eventOperations(a plainList<T>) concurrently. That doesn't fire aDictionary-style modification check, so it corrupts silently rather than throwing. The proper architectural fix lives daemon-side (serialize the storage-mutation phase, or use per-task sessions); intentionally left for a follow-up.Validation
net9.0andnet10.0)._session), so a clean local build + CI cover it adequately.🤖 Generated with Claude Code