From fa699ba042e8729fd7a1603bbc63f6036ea43b4e Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Fri, 5 Jun 2026 09:46:12 -0500 Subject: [PATCH] =?UTF-8?q?#4667=20Phase=201=20=E2=80=94=20write-path=20*P?= =?UTF-8?q?rojected=20variants=20bypass=20session-shared=20trackers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Async-daemon parallel slice workers share a single DocumentSessionBase; the projection write path was still reaching into _session.Versions / _session.Revisions / _session.ItemMap from inside the parallel block. #4658 closed StoreProjection; this phase closes the remaining write sites that ProjectionStorage's Store(snapshot) and Store(snapshot, id, tenantId) routed through. Surface change: IDocumentStorage.UpsertProjected(T, string tenantId) IDocumentStorage.InsertProjected(T, string tenantId) IDocumentStorage.UpdateProjected(T, string tenantId) These mirror the existing OverwriteProjected pattern. The 9 writeable closed-shape storage leaves build their corresponding closed-shape op with null trackers; QueryOnly throws NotSupported; SubClass / ValueType / Event storages delegate or throw consistently with their existing OverwriteProjected implementations. Null-tolerance audit applied to all four ConcurrencyMode op classes that hold a tracker dictionary (Optimistic Insert/Update/Upsert + Numeric Insert/Update/ Upsert): _versions / _revisions are now nullable and the dict writes in PostprocessAsync are guarded. Optimistic Update/Upsert's ConfigureCommand treats a null tracker as "expected version unknown" and binds DBNull for the WHERE-guard slot. ProjectionStorage rewrites: * Store(snapshot) -> _storage.UpsertProjected(snapshot, TenantId) * Store(snapshot, id, tenantId) -> _storage.UpsertProjected(snapshot, tenantId) The GH-3850 identity-map maintenance (EjectAggregateFromIdentityMap + _storage.Store(_session, snapshot)) is preserved but gated on Options.EventGraph.UseIdentityMapForAggregates so the default (false) path takes the session-state-free write and the opt-in (true) path still gets correct inline-projection-rewriting-an-immutable-aggregate semantics. Per the #4667 Phase 3 design note, opt-in is documented as not safe for parallel projection workers — that race is accepted with the flag on. Verified locally: * DaemonTests net9.0 — 187 / 187 green * EventSourcingTests net9.0 — 1361 / 1368 green (7 pre-existing skips, including the GH-3850 regression which still passes) * CoreTests net9.0 — 421 / 422 green (1 pre-existing skip) Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Marten/Events/EventDocumentStorage.cs | 16 +++++++ src/Marten/Events/EventMapping.cs | 16 +++++++ .../NumericClosedShapeInsertOperation.cs | 12 +++-- .../NumericClosedShapeUpdateOperation.cs | 12 +++-- .../NumericClosedShapeUpsertOperation.cs | 12 +++-- .../NumericDirtyCheckedClosedShapeStorage.cs | 10 +++++ .../NumericIdentityMapClosedShapeStorage.cs | 10 +++++ .../NumericLightweightClosedShapeStorage.cs | 13 ++++++ .../OptimisticClosedShapeInsertOperation.cs | 12 +++-- .../OptimisticClosedShapeUpdateOperation.cs | 20 ++++++--- .../OptimisticClosedShapeUpsertOperation.cs | 20 ++++++--- ...ptimisticDirtyCheckedClosedShapeStorage.cs | 10 +++++ ...OptimisticIdentityMapClosedShapeStorage.cs | 10 +++++ ...OptimisticLightweightClosedShapeStorage.cs | 15 +++++++ .../QueryOnlyClosedShapeStorage.cs | 10 +++++ ...versionedDirtyCheckedClosedShapeStorage.cs | 10 +++++ ...nversionedIdentityMapClosedShapeStorage.cs | 10 +++++ ...nversionedLightweightClosedShapeStorage.cs | 12 +++++ .../DocumentSessionBase.ProjectionStorage.cs | 45 +++++++++++++------ .../Internal/Storage/DocumentStorage.cs | 9 ++++ .../Internal/Storage/IDocumentStorage.cs | 20 +++++++++ .../Storage/SubClassDocumentStorage.cs | 17 +++++++ .../ValueTypeIdentifiedDocumentStorage.cs | 10 +++++ 23 files changed, 296 insertions(+), 35 deletions(-) diff --git a/src/Marten/Events/EventDocumentStorage.cs b/src/Marten/Events/EventDocumentStorage.cs index 05da629d31..b28cdeff6c 100644 --- a/src/Marten/Events/EventDocumentStorage.cs +++ b/src/Marten/Events/EventDocumentStorage.cs @@ -228,6 +228,22 @@ public IStorageOperation OverwriteProjected(IEvent document, string tenant) throw new NotSupportedException(); } + // #4667 — events aren't projected through the document write path. + public IStorageOperation UpsertProjected(IEvent document, string tenant) + { + throw new NotSupportedException(); + } + + public IStorageOperation InsertProjected(IEvent document, string tenant) + { + throw new NotSupportedException(); + } + + public IStorageOperation UpdateProjected(IEvent document, string tenant) + { + throw new NotSupportedException(); + } + public abstract IStorageOperation AppendEvent(EventGraph events, IMartenSession session, StreamAction stream, IEvent e); diff --git a/src/Marten/Events/EventMapping.cs b/src/Marten/Events/EventMapping.cs index d3a991662a..548839235e 100644 --- a/src/Marten/Events/EventMapping.cs +++ b/src/Marten/Events/EventMapping.cs @@ -334,6 +334,22 @@ IStorageOperation IDocumentStorage.OverwriteProjected(T document, string tena throw new NotSupportedException(); } + // #4667 — events aren't projected through the document write path. + IStorageOperation IDocumentStorage.UpsertProjected(T document, string tenant) + { + throw new NotSupportedException(); + } + + IStorageOperation IDocumentStorage.InsertProjected(T document, string tenant) + { + throw new NotSupportedException(); + } + + IStorageOperation IDocumentStorage.UpdateProjected(T document, string tenant) + { + throw new NotSupportedException(); + } + public IDeletion DeleteForDocument(T document, string tenant) { throw new NotSupportedException(); diff --git a/src/Marten/Internal/ClosedShape/NumericClosedShapeInsertOperation.cs b/src/Marten/Internal/ClosedShape/NumericClosedShapeInsertOperation.cs index 15446bd497..6db1d9df11 100644 --- a/src/Marten/Internal/ClosedShape/NumericClosedShapeInsertOperation.cs +++ b/src/Marten/Internal/ClosedShape/NumericClosedShapeInsertOperation.cs @@ -26,14 +26,14 @@ internal sealed class NumericClosedShapeInsertOperation: ClosedShapeI where TDoc : notnull where TId : notnull { - private readonly Dictionary _revisions; + private readonly Dictionary? _revisions; public NumericClosedShapeInsertOperation( TDoc document, TId id, string tenantId, DocumentStorageDescriptor descriptor, - Dictionary revisions) + Dictionary? revisions) : base(document, id, tenantId, descriptor) { _revisions = revisions; @@ -67,7 +67,13 @@ public override async Task PostprocessAsync(DbDataReader reader, IList(0, token).ConfigureAwait(false); - _revisions[_id] = newRevision; + // #4667 — null tracker (the InsertProjected path) skips the tracker + // write. RevisionBinder still applies so the document's revision + // field is fresh. + if (_revisions is not null) + { + _revisions[_id] = newRevision; + } _descriptor.RevisionBinder!.ApplyRevisionTo(_document, newRevision); } diff --git a/src/Marten/Internal/ClosedShape/NumericClosedShapeUpdateOperation.cs b/src/Marten/Internal/ClosedShape/NumericClosedShapeUpdateOperation.cs index b109d0bb4c..7e922055ba 100644 --- a/src/Marten/Internal/ClosedShape/NumericClosedShapeUpdateOperation.cs +++ b/src/Marten/Internal/ClosedShape/NumericClosedShapeUpdateOperation.cs @@ -24,14 +24,14 @@ internal sealed class NumericClosedShapeUpdateOperation: ClosedShapeU where TDoc : notnull where TId : notnull { - private readonly Dictionary _revisions; + private readonly Dictionary? _revisions; public NumericClosedShapeUpdateOperation( TDoc document, TId id, string tenantId, DocumentStorageDescriptor descriptor, - Dictionary revisions) + Dictionary? revisions) : base(document, id, tenantId, descriptor) { _revisions = revisions; @@ -66,7 +66,13 @@ public override async Task PostprocessAsync(DbDataReader reader, IList(0, token).ConfigureAwait(false); - _revisions[_id] = newRevision; + // #4667 — null tracker (the UpdateProjected path) skips the tracker + // write. RevisionBinder still applies so the document's revision + // field is fresh. + if (_revisions is not null) + { + _revisions[_id] = newRevision; + } _descriptor.RevisionBinder!.ApplyRevisionTo(_document, newRevision); } diff --git a/src/Marten/Internal/ClosedShape/NumericClosedShapeUpsertOperation.cs b/src/Marten/Internal/ClosedShape/NumericClosedShapeUpsertOperation.cs index 9e9ef81666..1ae8f37a6f 100644 --- a/src/Marten/Internal/ClosedShape/NumericClosedShapeUpsertOperation.cs +++ b/src/Marten/Internal/ClosedShape/NumericClosedShapeUpsertOperation.cs @@ -27,7 +27,7 @@ internal sealed class NumericClosedShapeUpsertOperation: ClosedShapeU where TDoc : notnull where TId : notnull { - private readonly Dictionary _revisions; + private readonly Dictionary? _revisions; public NumericClosedShapeUpsertOperation( TDoc document, @@ -35,7 +35,7 @@ public NumericClosedShapeUpsertOperation( string tenantId, DocumentStorageDescriptor descriptor, OperationRole role, - Dictionary revisions) + Dictionary? revisions) : base(document, id, tenantId, descriptor, role) { _revisions = revisions; @@ -68,7 +68,13 @@ public override async Task PostprocessAsync(DbDataReader reader, IList(0, token).ConfigureAwait(false); - _revisions[_id] = newRevision; + // #4667 — null tracker (the UpsertProjected path) skips the tracker + // write. RevisionBinder still applies so the document's revision + // field is fresh. + if (_revisions is not null) + { + _revisions[_id] = newRevision; + } _descriptor.RevisionBinder!.ApplyRevisionTo(_document, newRevision); } diff --git a/src/Marten/Internal/ClosedShape/NumericDirtyCheckedClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/NumericDirtyCheckedClosedShapeStorage.cs index f55487a283..e0e03154c6 100644 --- a/src/Marten/Internal/ClosedShape/NumericDirtyCheckedClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/NumericDirtyCheckedClosedShapeStorage.cs @@ -33,6 +33,16 @@ public override IStorageOperation Overwrite(TDoc document, IMartenSession sessio public override IStorageOperation OverwriteProjected(TDoc document, string tenant) => new NumericClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + // #4667 — null revision tracker; see Lightweight peer for semantics. + public override IStorageOperation UpsertProjected(TDoc document, string tenant) + => new NumericClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, null); + + public override IStorageOperation InsertProjected(TDoc document, string tenant) + => new NumericClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, null); + + public override IStorageOperation UpdateProjected(TDoc document, string tenant) + => new NumericClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, null); + public override ISelector BuildSelector(IMartenSession session) => _descriptor.HierarchyMapping is not null ? new HierarchicalNumericClosedShapeDirtyTrackingSelector(session, _descriptor) diff --git a/src/Marten/Internal/ClosedShape/NumericIdentityMapClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/NumericIdentityMapClosedShapeStorage.cs index 8862e489b5..9c3b15bfc1 100644 --- a/src/Marten/Internal/ClosedShape/NumericIdentityMapClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/NumericIdentityMapClosedShapeStorage.cs @@ -33,6 +33,16 @@ public override IStorageOperation Overwrite(TDoc document, IMartenSession sessio public override IStorageOperation OverwriteProjected(TDoc document, string tenant) => new NumericClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + // #4667 — null revision tracker; see Lightweight peer for semantics. + public override IStorageOperation UpsertProjected(TDoc document, string tenant) + => new NumericClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, null); + + public override IStorageOperation InsertProjected(TDoc document, string tenant) + => new NumericClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, null); + + public override IStorageOperation UpdateProjected(TDoc document, string tenant) + => new NumericClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, null); + public override ISelector BuildSelector(IMartenSession session) => _descriptor.HierarchyMapping is not null ? new HierarchicalNumericClosedShapeIdentityMapSelector(session, _descriptor) diff --git a/src/Marten/Internal/ClosedShape/NumericLightweightClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/NumericLightweightClosedShapeStorage.cs index 630ab69a45..2a770c0f71 100644 --- a/src/Marten/Internal/ClosedShape/NumericLightweightClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/NumericLightweightClosedShapeStorage.cs @@ -38,6 +38,19 @@ public override IStorageOperation OverwriteProjected(TDoc document, string tenan // the session's numeric-revision map for the projected doc. => new NumericClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + // #4667 — null revision tracker. Numeric ops bind the IRevisionedOperation + // `Revision` property (default 0) in ConfigureCommand, so the WHERE guard + // `? = 0 OR table.mt_version < ?` always passes when the caller leaves + // Revision at the default. + public override IStorageOperation UpsertProjected(TDoc document, string tenant) + => new NumericClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, null); + + public override IStorageOperation InsertProjected(TDoc document, string tenant) + => new NumericClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, null); + + public override IStorageOperation UpdateProjected(TDoc document, string tenant) + => new NumericClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, null); + public override ISelector BuildSelector(IMartenSession session) => _descriptor.HierarchyMapping is not null ? new HierarchicalNumericClosedShapeLightweightSelector(session, _descriptor) diff --git a/src/Marten/Internal/ClosedShape/OptimisticClosedShapeInsertOperation.cs b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeInsertOperation.cs index 6ce0426f71..2cc24e246a 100644 --- a/src/Marten/Internal/ClosedShape/OptimisticClosedShapeInsertOperation.cs +++ b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeInsertOperation.cs @@ -25,7 +25,7 @@ internal sealed class OptimisticClosedShapeInsertOperation: ClosedSha where TDoc : notnull where TId : notnull { - private readonly Dictionary _versions; + private readonly Dictionary? _versions; private readonly Guid _newVersion; public OptimisticClosedShapeInsertOperation( @@ -33,7 +33,7 @@ public OptimisticClosedShapeInsertOperation( TId id, string tenantId, DocumentStorageDescriptor descriptor, - Dictionary versions) + Dictionary? versions) : base(document, id, tenantId, descriptor) { _versions = versions; @@ -69,6 +69,12 @@ public override async Task PostprocessAsync(DbDataReader reader, IList: ClosedSha where TDoc : notnull where TId : notnull { - private readonly Dictionary _versions; + private readonly Dictionary? _versions; private readonly Guid _newVersion; public OptimisticClosedShapeUpdateOperation( @@ -34,7 +34,7 @@ public OptimisticClosedShapeUpdateOperation( TId id, string tenantId, DocumentStorageDescriptor descriptor, - Dictionary versions) + Dictionary? versions) : base(document, id, tenantId, descriptor) { _versions = versions; @@ -46,8 +46,12 @@ public override void ConfigureCommand(ICommandBuilder builder, IMartenSession se var parameters = builder.AppendWithParameters(_descriptor.UpdateSql, '?'); var slot = BindPreConcurrencyParameters(parameters, session); - // Trailing WHERE mt_version = ? guard. - if (_versions.TryGetValue(_id, out var expected)) + // Trailing WHERE mt_version = ? guard. #4667 — null tracker (the + // UpdateProjected path) treats expected version as DBNull, which the + // SQL WHERE never matches. Callers that go through UpdateProjected + // should also set IgnoreConcurrencyViolation = true to suppress the + // resulting "no row" exception. + if (_versions is not null && _versions.TryGetValue(_id, out var expected)) { parameters[slot].Value = expected; } @@ -69,7 +73,13 @@ public override async Task PostprocessAsync(DbDataReader reader, IList binder, IMartenSession session) diff --git a/src/Marten/Internal/ClosedShape/OptimisticClosedShapeUpsertOperation.cs b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeUpsertOperation.cs index 95accfa033..63edd63564 100644 --- a/src/Marten/Internal/ClosedShape/OptimisticClosedShapeUpsertOperation.cs +++ b/src/Marten/Internal/ClosedShape/OptimisticClosedShapeUpsertOperation.cs @@ -27,7 +27,7 @@ internal sealed class OptimisticClosedShapeUpsertOperation: ClosedSha where TDoc : notnull where TId : notnull { - private readonly Dictionary _versions; + private readonly Dictionary? _versions; private readonly Guid _newVersion; public OptimisticClosedShapeUpsertOperation( @@ -36,7 +36,7 @@ public OptimisticClosedShapeUpsertOperation( string tenantId, DocumentStorageDescriptor descriptor, OperationRole role, - Dictionary versions) + Dictionary? versions) : base(document, id, tenantId, descriptor, role) { _versions = versions; @@ -48,8 +48,12 @@ public override void ConfigureCommand(ICommandBuilder builder, IMartenSession se var parameters = builder.AppendWithParameters(_descriptor.UpsertSql, '?'); var slot = BindPreOnConflictParameters(parameters, session); - // Trailing WHERE table.mt_version = ? guard. - if (_versions.TryGetValue(_id, out var expected)) + // Trailing WHERE table.mt_version = ? guard. #4667 — null tracker + // (the UpsertProjected path) treats expected version as DBNull. On + // the ON CONFLICT branch the WHERE will never match, so existing + // rows in Optimistic mode are left untouched. The INSERT branch + // still fires for new rows. + if (_versions is not null && _versions.TryGetValue(_id, out var expected)) { parameters[slot].Value = expected; } @@ -71,7 +75,13 @@ public override async Task PostprocessAsync(DbDataReader reader, IList binder, IMartenSession session) diff --git a/src/Marten/Internal/ClosedShape/OptimisticDirtyCheckedClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/OptimisticDirtyCheckedClosedShapeStorage.cs index e7922d16a7..50c0441e54 100644 --- a/src/Marten/Internal/ClosedShape/OptimisticDirtyCheckedClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/OptimisticDirtyCheckedClosedShapeStorage.cs @@ -33,6 +33,16 @@ public override IStorageOperation Overwrite(TDoc document, IMartenSession sessio public override IStorageOperation OverwriteProjected(TDoc document, string tenant) => new OptimisticClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + // #4667 — null version tracker; see Lightweight peer for semantics. + public override IStorageOperation UpsertProjected(TDoc document, string tenant) + => new OptimisticClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, null); + + public override IStorageOperation InsertProjected(TDoc document, string tenant) + => new OptimisticClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, null); + + public override IStorageOperation UpdateProjected(TDoc document, string tenant) + => new OptimisticClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, null); + public override ISelector BuildSelector(IMartenSession session) => _descriptor.HierarchyMapping is not null ? new HierarchicalOptimisticClosedShapeDirtyTrackingSelector(session, _descriptor) diff --git a/src/Marten/Internal/ClosedShape/OptimisticIdentityMapClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/OptimisticIdentityMapClosedShapeStorage.cs index 5d2cab781d..034c9789ea 100644 --- a/src/Marten/Internal/ClosedShape/OptimisticIdentityMapClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/OptimisticIdentityMapClosedShapeStorage.cs @@ -33,6 +33,16 @@ public override IStorageOperation Overwrite(TDoc document, IMartenSession sessio public override IStorageOperation OverwriteProjected(TDoc document, string tenant) => new OptimisticClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + // #4667 — null version tracker; see Lightweight peer for semantics. + public override IStorageOperation UpsertProjected(TDoc document, string tenant) + => new OptimisticClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, null); + + public override IStorageOperation InsertProjected(TDoc document, string tenant) + => new OptimisticClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, null); + + public override IStorageOperation UpdateProjected(TDoc document, string tenant) + => new OptimisticClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, null); + public override ISelector BuildSelector(IMartenSession session) => _descriptor.HierarchyMapping is not null ? new HierarchicalOptimisticClosedShapeIdentityMapSelector(session, _descriptor) diff --git a/src/Marten/Internal/ClosedShape/OptimisticLightweightClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/OptimisticLightweightClosedShapeStorage.cs index a96c6f25f0..4d544baabe 100644 --- a/src/Marten/Internal/ClosedShape/OptimisticLightweightClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/OptimisticLightweightClosedShapeStorage.cs @@ -38,6 +38,21 @@ public override IStorageOperation OverwriteProjected(TDoc document, string tenan // the session's optimistic-version map for the projected doc. => new OptimisticClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor, null); + // #4667 — projection write paths pass null tracker (no session-shared + // dict access). Optimistic Upsert/Update with null tracker bind DBNull + // for the WHERE-guard; existing rows are left untouched on the ON CONFLICT + // branch. Callers in the projection runtime set IgnoreConcurrencyViolation + // = true (see ProjectionStorage.StoreProjection / Store flows) to suppress + // the resulting "no row" exception. + public override IStorageOperation UpsertProjected(TDoc document, string tenant) + => new OptimisticClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert, null); + + public override IStorageOperation InsertProjected(TDoc document, string tenant) + => new OptimisticClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor, null); + + public override IStorageOperation UpdateProjected(TDoc document, string tenant) + => new OptimisticClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor, null); + public override ISelector BuildSelector(IMartenSession session) => _descriptor.HierarchyMapping is not null ? new HierarchicalOptimisticClosedShapeLightweightSelector(session, _descriptor) diff --git a/src/Marten/Internal/ClosedShape/QueryOnlyClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/QueryOnlyClosedShapeStorage.cs index 3a4cc1b669..aad3f7c992 100644 --- a/src/Marten/Internal/ClosedShape/QueryOnlyClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/QueryOnlyClosedShapeStorage.cs @@ -61,6 +61,16 @@ public override IStorageOperation Overwrite(TDoc document, IMartenSession sessio public override IStorageOperation OverwriteProjected(TDoc document, string tenant) => throw new NotSupportedException("QueryOnly storage doesn't support OverwriteProjected."); + // #4667 — projection write paths aren't reachable from query sessions. + public override IStorageOperation UpsertProjected(TDoc document, string tenant) + => throw new NotSupportedException("QueryOnly storage doesn't support UpsertProjected."); + + public override IStorageOperation InsertProjected(TDoc document, string tenant) + => throw new NotSupportedException("QueryOnly storage doesn't support InsertProjected."); + + public override IStorageOperation UpdateProjected(TDoc document, string tenant) + => throw new NotSupportedException("QueryOnly storage doesn't support UpdateProjected."); + public override ISelector BuildSelector(IMartenSession session) // #4659 Phase 2: pick the Flat / Hierarchical selector ONCE per // query — neither selector class branches on HierarchyMapping per diff --git a/src/Marten/Internal/ClosedShape/UnversionedDirtyCheckedClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/UnversionedDirtyCheckedClosedShapeStorage.cs index de5160eab6..a8b183bf4d 100644 --- a/src/Marten/Internal/ClosedShape/UnversionedDirtyCheckedClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/UnversionedDirtyCheckedClosedShapeStorage.cs @@ -33,6 +33,16 @@ public override IStorageOperation Overwrite(TDoc document, IMartenSession sessio public override IStorageOperation OverwriteProjected(TDoc document, string tenant) => new UnversionedClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor); + // #4667 — Unversioned ops have no tracker plumbing; see Lightweight peer. + public override IStorageOperation UpsertProjected(TDoc document, string tenant) + => new UnversionedClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert); + + public override IStorageOperation InsertProjected(TDoc document, string tenant) + => new UnversionedClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation UpdateProjected(TDoc document, string tenant) + => new UnversionedClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor); + public override ISelector BuildSelector(IMartenSession session) => _descriptor.HierarchyMapping is not null ? new HierarchicalUnversionedClosedShapeDirtyTrackingSelector(session, _descriptor) diff --git a/src/Marten/Internal/ClosedShape/UnversionedIdentityMapClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/UnversionedIdentityMapClosedShapeStorage.cs index bcb3fda9d7..a6e9680627 100644 --- a/src/Marten/Internal/ClosedShape/UnversionedIdentityMapClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/UnversionedIdentityMapClosedShapeStorage.cs @@ -33,6 +33,16 @@ public override IStorageOperation Overwrite(TDoc document, IMartenSession sessio public override IStorageOperation OverwriteProjected(TDoc document, string tenant) => new UnversionedClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor); + // #4667 — Unversioned ops have no tracker plumbing; see Lightweight peer. + public override IStorageOperation UpsertProjected(TDoc document, string tenant) + => new UnversionedClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert); + + public override IStorageOperation InsertProjected(TDoc document, string tenant) + => new UnversionedClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation UpdateProjected(TDoc document, string tenant) + => new UnversionedClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor); + public override ISelector BuildSelector(IMartenSession session) => _descriptor.HierarchyMapping is not null ? new HierarchicalUnversionedClosedShapeIdentityMapSelector(session, _descriptor) diff --git a/src/Marten/Internal/ClosedShape/UnversionedLightweightClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/UnversionedLightweightClosedShapeStorage.cs index a748a5b0f1..fcaeb91367 100644 --- a/src/Marten/Internal/ClosedShape/UnversionedLightweightClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/UnversionedLightweightClosedShapeStorage.cs @@ -36,6 +36,18 @@ public override IStorageOperation Overwrite(TDoc document, IMartenSession sessio public override IStorageOperation OverwriteProjected(TDoc document, string tenant) => new UnversionedClosedShapeOverwriteOperation(document, Identity(document), tenant, _descriptor); + // #4667 — Unversioned ops have no tracker plumbing to begin with, so + // the *Projected factories return the same op as their session-aware + // counterparts. They exist for API uniformity across concurrency modes. + public override IStorageOperation UpsertProjected(TDoc document, string tenant) + => new UnversionedClosedShapeUpsertOperation(document, Identity(document), tenant, _descriptor, OperationRole.Upsert); + + public override IStorageOperation InsertProjected(TDoc document, string tenant) + => new UnversionedClosedShapeInsertOperation(document, Identity(document), tenant, _descriptor); + + public override IStorageOperation UpdateProjected(TDoc document, string tenant) + => new UnversionedClosedShapeUpdateOperation(document, Identity(document), tenant, _descriptor); + public override ISelector BuildSelector(IMartenSession session) => _descriptor.HierarchyMapping is not null ? new HierarchicalUnversionedClosedShapeLightweightSelector(session, _descriptor) diff --git a/src/Marten/Internal/Sessions/DocumentSessionBase.ProjectionStorage.cs b/src/Marten/Internal/Sessions/DocumentSessionBase.ProjectionStorage.cs index 766a8cd672..01dd575132 100644 --- a/src/Marten/Internal/Sessions/DocumentSessionBase.ProjectionStorage.cs +++ b/src/Marten/Internal/Sessions/DocumentSessionBase.ProjectionStorage.cs @@ -83,7 +83,10 @@ public void UnDelete(TDoc snapshot) public void Store(TDoc snapshot) { - var upsert = _storage.Upsert(snapshot, _session, TenantId); + // #4667 — UpsertProjected (not Upsert) so we never read or mutate + // _session.Versions / _session.Revisions from a daemon worker. The + // projection runtime is by-contract not session-state-aware. + var upsert = _storage.UpsertProjected(snapshot, TenantId); _session.QueueOperation(upsert); } @@ -118,19 +121,35 @@ public void Store(TDoc snapshot, TId id, string tenantId) { _storage.SetIdentity(snapshot, id); - // The aggregate may already be in the identity map from a prior SaveChangesAsync - // on the same session — for example, a FetchForWriting → save → StartStream - // sequence. In that case the projection has built a NEW snapshot instance for - // this save, and the duplicate-instance guard in IdentityMapDocumentStorage.store - // would throw before the underlying event store can surface the real conflict - // (ExistingStreamIdCollisionException). Evict the stale entry so the new snapshot - // can take its place. - _session.EjectAggregateFromIdentityMap(id); - - // Put it in the identity map -- if necessary - _storage.Store(_session, snapshot); + // #4667 — The ItemMap eject + IdentityMap-storage Store call below is + // the GH-3850 fix: inline-projection-rewriting-an-immutable-aggregate + // on an IdentitySession needs the freshly-built snapshot instance to + // replace the stale identity-map entry so a subsequent FetchLatest on + // the same session sees the new state. That ItemMap mutation is a race + // source under the async daemon's parallel Block(10, ...) workers + // (#4657), but the daemon never opts into UseIdentityMapForAggregates, + // so we gate the identity-map maintenance on the same flag. The + // default (false) case takes the session-state-free UpsertProjected + // path; the opt-in (true) case preserves the GH-3850 semantics and + // accepts the documented race risk per the #4667 Phase 3 design note + // ("opt-in is not safe for parallel projection workers"). + if (_session.Options.EventGraph.UseIdentityMapForAggregates) + { + // The aggregate may already be in the identity map from a prior + // SaveChangesAsync on the same session — for example, a + // FetchForWriting → save → StartStream sequence. In that case + // the projection has built a NEW snapshot instance for this save + // and the duplicate-instance guard in IdentityMapDocumentStorage.store + // would throw before the underlying event store can surface the + // real conflict (ExistingStreamIdCollisionException). Evict the + // stale entry so the new snapshot can take its place. + _session.EjectAggregateFromIdentityMap(id); + + // Put it in the identity map -- if necessary + _storage.Store(_session, snapshot); + } - var upsert = _storage.Upsert(snapshot, _session, tenantId); + var upsert = _storage.UpsertProjected(snapshot, tenantId); _session.QueueOperation(upsert); } diff --git a/src/Marten/Internal/Storage/DocumentStorage.cs b/src/Marten/Internal/Storage/DocumentStorage.cs index f1325c0b29..7a6f824525 100644 --- a/src/Marten/Internal/Storage/DocumentStorage.cs +++ b/src/Marten/Internal/Storage/DocumentStorage.cs @@ -295,6 +295,15 @@ object IDocumentStorage.IdentityFor(T document) /// public abstract IStorageOperation OverwriteProjected(T document, string tenant); + /// + public abstract IStorageOperation UpsertProjected(T document, string tenant); + + /// + public abstract IStorageOperation InsertProjected(T document, string tenant); + + /// + public abstract IStorageOperation UpdateProjected(T document, string tenant); + public IDeletion DeleteForDocument(T document, string tenant) { var id = Identity(document); diff --git a/src/Marten/Internal/Storage/IDocumentStorage.cs b/src/Marten/Internal/Storage/IDocumentStorage.cs index 58a8e68037..d3f233b5fc 100644 --- a/src/Marten/Internal/Storage/IDocumentStorage.cs +++ b/src/Marten/Internal/Storage/IDocumentStorage.cs @@ -120,6 +120,26 @@ public interface IDocumentStorage: IDocumentStorage where T : notnull /// IStorageOperation OverwriteProjected(T document, string tenantId); + /// + /// Session-free Upsert for projection storage (#4667 Phase 1). Builds the same Upsert + /// operation as but passes a null version/revision tracker so the + /// projection path never touches . Safe to call + /// from parallel async-daemon slice handlers that share an . + /// + IStorageOperation UpsertProjected(T document, string tenantId); + + /// + /// Session-free Insert for projection storage (#4667 Phase 1). See + /// . + /// + IStorageOperation InsertProjected(T document, string tenantId); + + /// + /// Session-free Update for projection storage (#4667 Phase 1). See + /// . + /// + IStorageOperation UpdateProjected(T document, string tenantId); + IDeletion DeleteForDocument(T document, string tenantId); diff --git a/src/Marten/Internal/Storage/SubClassDocumentStorage.cs b/src/Marten/Internal/Storage/SubClassDocumentStorage.cs index 37def2d102..5bae400c1b 100644 --- a/src/Marten/Internal/Storage/SubClassDocumentStorage.cs +++ b/src/Marten/Internal/Storage/SubClassDocumentStorage.cs @@ -172,6 +172,23 @@ public IStorageOperation OverwriteProjected(T document, string tenant) return _parent.OverwriteProjected(document, tenant); } + // #4667 — delegate the new projection write entry points to the parent + // hierarchy storage just like Overwrite/OverwriteProjected do. + public IStorageOperation UpsertProjected(T document, string tenant) + { + return _parent.UpsertProjected(document, tenant); + } + + public IStorageOperation InsertProjected(T document, string tenant) + { + return _parent.InsertProjected(document, tenant); + } + + public IStorageOperation UpdateProjected(T document, string tenant) + { + return _parent.UpdateProjected(document, tenant); + } + public IDeletion DeleteForDocument(T document, string tenant) { return _parent.DeleteForDocument(document, tenant); diff --git a/src/Marten/Internal/ValueTypeIdentifiedDocumentStorage.cs b/src/Marten/Internal/ValueTypeIdentifiedDocumentStorage.cs index 58e9ce0756..f63e660941 100644 --- a/src/Marten/Internal/ValueTypeIdentifiedDocumentStorage.cs +++ b/src/Marten/Internal/ValueTypeIdentifiedDocumentStorage.cs @@ -141,6 +141,16 @@ public IStorageOperation Overwrite(TDoc document, IMartenSession session, string public IStorageOperation OverwriteProjected(TDoc document, string tenantId) => Inner.OverwriteProjected(document, tenantId); + // #4667 — delegate the projection write entry points. + public IStorageOperation UpsertProjected(TDoc document, string tenantId) + => Inner.UpsertProjected(document, tenantId); + + public IStorageOperation InsertProjected(TDoc document, string tenantId) + => Inner.InsertProjected(document, tenantId); + + public IStorageOperation UpdateProjected(TDoc document, string tenantId) + => Inner.UpdateProjected(document, tenantId); + public IDeletion DeleteForDocument(TDoc document, string tenantId) => Inner.DeleteForDocument(document, tenantId);