diff --git a/src/Marten/Internal/ClosedShape/ClosedShapeProjectionLoader.cs b/src/Marten/Internal/ClosedShape/ClosedShapeProjectionLoader.cs new file mode 100644 index 0000000000..f00894bdda --- /dev/null +++ b/src/Marten/Internal/ClosedShape/ClosedShapeProjectionLoader.cs @@ -0,0 +1,124 @@ +#nullable enable +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using Marten.Schema; +using Marten.Storage; +using Npgsql; + +namespace Marten.Internal.ClosedShape; + +/// +/// Shared projection-safe load implementation for closed-shape document +/// storages (#4667 Phase 2). Opens a fresh connection from the +/// , executes the load SQL, and deserializes +/// the data column directly via — without any +/// reference, without writing to +/// VersionTracker / ItemMap / ChangeTrackers, and +/// without calling MarkAsDocumentLoaded. +/// +/// +/// +/// The projection-safe selector path intentionally skips metadata binders +/// (CreatedAt / LastModified / Headers / etc.). Projections care about the +/// aggregate state encoded in the data column for their Apply/Evolve hot +/// path; per-row metadata is not part of that contract. If a future +/// projection scenario needs metadata it can be added here as a focused +/// follow-up. +/// +/// +/// Hierarchical storages dispatch deserialization through the +/// alias-to-.NET-type lookup, +/// mirroring . +/// +/// +internal static class ClosedShapeProjectionLoader + where TDoc : notnull + where TId : notnull +{ + // Column layout matches the writeable closed-shape selectors + // (Lightweight / IdentityMap / DirtyChecked) since LoadProjectedAsync is + // only reached from those storages — see + // . QueryOnly storage + // has a different layout (id excluded, data at col 0) but doesn't + // implement LoadProjectedAsync; it throws NotSupportedException instead. + private const int IdColumn = 0; + private const int DataColumn = 1; + private const int FirstMetadataColumn = 2; + + public static async Task LoadAsync( + NpgsqlCommand command, + DocumentStorageDescriptor descriptor, + ISerializer serializer, + IMartenDatabase database, + CancellationToken token) + { + await using var conn = database.CreateConnection(); + await conn.OpenAsync(token).ConfigureAwait(false); + try + { + command.Connection = conn; + await using var reader = await command.ExecuteReaderAsync(token).ConfigureAwait(false); + if (!await reader.ReadAsync(token).ConfigureAwait(false)) + { + return default; + } + + return await readOneAsync(reader, descriptor, serializer, token).ConfigureAwait(false); + } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } + } + + public static async Task> LoadManyAsync( + NpgsqlCommand command, + DocumentStorageDescriptor descriptor, + ISerializer serializer, + IMartenDatabase database, + CancellationToken token) + { + await using var conn = database.CreateConnection(); + await conn.OpenAsync(token).ConfigureAwait(false); + try + { + command.Connection = conn; + await using var reader = await command.ExecuteReaderAsync(token).ConfigureAwait(false); + var list = new List(); + while (await reader.ReadAsync(token).ConfigureAwait(false)) + { + var doc = await readOneAsync(reader, descriptor, serializer, token).ConfigureAwait(false); + if (doc is not null) + { + list.Add(doc); + } + } + return list; + } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } + } + + private static async ValueTask readOneAsync( + DbDataReader reader, + DocumentStorageDescriptor descriptor, + ISerializer serializer, + CancellationToken token) + { + // Hierarchical: dispatch via mt_doc_type alias just like + // HierarchicalClosedShapeQueryOnlySelector. Flat: straight deserialize. + if (descriptor.HierarchyMapping is { } hierarchy) + { + var docTypeOrdinal = FirstMetadataColumn + descriptor.DocTypeReadIndex; + var alias = await reader.GetFieldValueAsync(docTypeOrdinal, token).ConfigureAwait(false); + return (TDoc)await serializer.FromJsonAsync(hierarchy.TypeFor(alias), reader, DataColumn, token).ConfigureAwait(false); + } + + return await serializer.FromJsonAsync(reader, DataColumn, token).ConfigureAwait(false); + } +} diff --git a/src/Marten/Internal/ClosedShape/DirtyCheckedClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/DirtyCheckedClosedShapeStorage.cs index 02935e5773..6fcd6655ad 100644 --- a/src/Marten/Internal/ClosedShape/DirtyCheckedClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/DirtyCheckedClosedShapeStorage.cs @@ -1,4 +1,7 @@ #nullable enable +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; using Marten.Internal; using Marten.Internal.Operations; using Marten.Internal.Storage; @@ -37,4 +40,13 @@ public override object RawIdentityValue(TId id) public override Npgsql.NpgsqlParameter BuildManyIdParameter(TId[] ids) => ClosedShapeIdHelpers.BuildManyIdParameter(ids, _descriptor.Identification); + + // #4667 Phase 2 — session-free projection load; see Lightweight peer. + public override Task LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token) + => ClosedShapeProjectionLoader.LoadAsync( + BuildLoadCommand(id, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token); + + public override Task> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token) + => ClosedShapeProjectionLoader.LoadManyAsync( + BuildLoadManyCommand(ids, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token); } diff --git a/src/Marten/Internal/ClosedShape/IdentityMapClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/IdentityMapClosedShapeStorage.cs index 15273fc01a..e3c95a7f6f 100644 --- a/src/Marten/Internal/ClosedShape/IdentityMapClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/IdentityMapClosedShapeStorage.cs @@ -1,4 +1,7 @@ #nullable enable +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; using Marten.Internal; using Marten.Internal.Operations; using Marten.Internal.Storage; @@ -37,4 +40,13 @@ public override object RawIdentityValue(TId id) public override Npgsql.NpgsqlParameter BuildManyIdParameter(TId[] ids) => ClosedShapeIdHelpers.BuildManyIdParameter(ids, _descriptor.Identification); + + // #4667 Phase 2 — session-free projection load; see Lightweight peer. + public override Task LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token) + => ClosedShapeProjectionLoader.LoadAsync( + BuildLoadCommand(id, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token); + + public override Task> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token) + => ClosedShapeProjectionLoader.LoadManyAsync( + BuildLoadManyCommand(ids, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token); } diff --git a/src/Marten/Internal/ClosedShape/LightweightClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/LightweightClosedShapeStorage.cs index 61ee51dff0..fa24893fd5 100644 --- a/src/Marten/Internal/ClosedShape/LightweightClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/LightweightClosedShapeStorage.cs @@ -1,4 +1,7 @@ #nullable enable +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; using Marten.Internal; using Marten.Internal.Operations; using Marten.Internal.Storage; @@ -52,4 +55,16 @@ public override object RawIdentityValue(TId id) public override Npgsql.NpgsqlParameter BuildManyIdParameter(TId[] ids) => ClosedShapeIdHelpers.BuildManyIdParameter(ids, _descriptor.Identification); + + // #4667 Phase 2 — session-free projection load. Opens a fresh connection + // from the supplied database and deserializes the data column directly, + // bypassing the session-aware BuildSelector path that writes versions / + // ItemMap / ChangeTrackers per row. Shared with IdentityMap / DirtyChecked. + public override Task LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token) + => ClosedShapeProjectionLoader.LoadAsync( + BuildLoadCommand(id, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token); + + public override Task> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token) + => ClosedShapeProjectionLoader.LoadManyAsync( + BuildLoadManyCommand(ids, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token); } diff --git a/src/Marten/Internal/ClosedShape/QueryOnlyClosedShapeStorage.cs b/src/Marten/Internal/ClosedShape/QueryOnlyClosedShapeStorage.cs index aad3f7c992..531eac5bc0 100644 --- a/src/Marten/Internal/ClosedShape/QueryOnlyClosedShapeStorage.cs +++ b/src/Marten/Internal/ClosedShape/QueryOnlyClosedShapeStorage.cs @@ -71,6 +71,15 @@ public override IStorageOperation InsertProjected(TDoc document, string tenant) public override IStorageOperation UpdateProjected(TDoc document, string tenant) => throw new NotSupportedException("QueryOnly storage doesn't support UpdateProjected."); + // #4667 Phase 2 — QueryOnly storages aren't used by the projection read + // path; ProjectionStorage holds a writeable storage instance for the + // projected document type, not a QueryOnly one. + public override System.Threading.Tasks.Task LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, System.Threading.CancellationToken token) + => throw new NotSupportedException("QueryOnly storage doesn't support LoadProjectedAsync."); + + public override System.Threading.Tasks.Task> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, System.Threading.CancellationToken token) + => throw new NotSupportedException("QueryOnly storage doesn't support LoadManyProjectedAsync."); + public override ISelector BuildSelector(IMartenSession session) // #4659 Phase 2: pick the Flat / Hierarchical selector ONCE per // query — neither selector class branches on HierarchyMapping per diff --git a/src/Marten/Internal/Sessions/DocumentSessionBase.ProjectionStorage.cs b/src/Marten/Internal/Sessions/DocumentSessionBase.ProjectionStorage.cs index 01dd575132..2b3cc94462 100644 --- a/src/Marten/Internal/Sessions/DocumentSessionBase.ProjectionStorage.cs +++ b/src/Marten/Internal/Sessions/DocumentSessionBase.ProjectionStorage.cs @@ -161,7 +161,17 @@ public void Delete(TId identity, string tenantId) public async Task> LoadManyAsync(TId[] identities, CancellationToken cancellationToken) { - var docs = await _storage.LoadManyAsync(identities, _session, cancellationToken).ConfigureAwait(false); + // #4667 Phase 2 — default (UseIdentityMapForAggregates = false) routes + // through LoadManyProjectedAsync, which opens a fresh connection and + // uses a tracker-free deserialization path so the async daemon's + // parallel slice workers never touch _session.Versions / + // _session.ItemMap / _session.ChangeTrackers. The opt-in (true) case + // preserves the inline-projection identity-map semantics by falling + // through to the session-aware LoadManyAsync — at the cost of the + // race the flag's design note already documents. + var docs = _session.Options.EventGraph.UseIdentityMapForAggregates + ? await _storage.LoadManyAsync(identities, _session, cancellationToken).ConfigureAwait(false) + : await _storage.LoadManyProjectedAsync(identities, _session.Database, TenantId, cancellationToken).ConfigureAwait(false); return docs.ToDictionary(doc => _storage.Identity(doc)); } @@ -240,6 +250,9 @@ private Func archiveOperationBuilderFor() //TODO fix in IProjectionStorage public Task LoadAsync(TId id, CancellationToken cancellation) { - return _storage.LoadAsync(id, _session, cancellation); + // #4667 Phase 2 — see LoadManyAsync above for rationale. + return _session.Options.EventGraph.UseIdentityMapForAggregates + ? _storage.LoadAsync(id, _session, cancellation) + : _storage.LoadProjectedAsync(id, _session.Database, TenantId, cancellation); } } diff --git a/src/Marten/Internal/Storage/DocumentStorage.cs b/src/Marten/Internal/Storage/DocumentStorage.cs index 7a6f824525..cb7a272191 100644 --- a/src/Marten/Internal/Storage/DocumentStorage.cs +++ b/src/Marten/Internal/Storage/DocumentStorage.cs @@ -356,6 +356,22 @@ public ISqlFragment FilterDocuments(ISqlFragment query, IMartenSession session) public abstract Task> LoadManyAsync(TId[] ids, IMartenSession session, CancellationToken token); + /// + /// + /// Default impl throws — closed-shape storages (the projection-eligible + /// path) override with a fresh-connection read that bypasses session-shared + /// trackers. Non-closed-shape storages aren't reachable via the projection + /// write path and don't need this overload. + /// + public virtual Task LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token) + => throw new NotSupportedException( + $"{GetType().Name} doesn't implement LoadProjectedAsync. Closed-shape storage variants provide this for the async-daemon projection-safe read path (#4667 Phase 2)."); + + /// + public virtual Task> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token) + => throw new NotSupportedException( + $"{GetType().Name} doesn't implement LoadManyProjectedAsync. Closed-shape storage variants provide this for the async-daemon projection-safe read path (#4667 Phase 2)."); + [MethodImpl(MethodImplOptions.AggressiveInlining)] public abstract TId Identity(T document); diff --git a/src/Marten/Internal/Storage/IDocumentStorage.cs b/src/Marten/Internal/Storage/IDocumentStorage.cs index d3f233b5fc..5c78bb2083 100644 --- a/src/Marten/Internal/Storage/IDocumentStorage.cs +++ b/src/Marten/Internal/Storage/IDocumentStorage.cs @@ -159,6 +159,23 @@ public interface IDocumentStorage: IDocumentStorage, IIdentitySetter< Task> LoadManyAsync(TId[] ids, IMartenSession session, CancellationToken token); + /// + /// Session-free Load for projection storage (#4667 Phase 2). Opens a fresh + /// connection from the database, executes the load SQL, and returns the + /// deserialized document. Does not touch any session-shared state — no + /// version/revision tracker writes, no ItemMap updates, no + /// MarkAsDocumentLoaded, no ChangeTrackers writes. Safe to call + /// from parallel async-daemon slice handlers that share an + /// . + /// + Task LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token); + + /// + /// Session-free LoadMany for projection storage (#4667 Phase 2). See + /// . + /// + Task> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token); + TId AssignIdentity(T document, string tenantId, IMartenDatabase database); ISqlFragment ByIdFilter(TId id); diff --git a/src/Marten/Internal/Storage/SubClassDocumentStorage.cs b/src/Marten/Internal/Storage/SubClassDocumentStorage.cs index 5bae400c1b..85bf5cf50e 100644 --- a/src/Marten/Internal/Storage/SubClassDocumentStorage.cs +++ b/src/Marten/Internal/Storage/SubClassDocumentStorage.cs @@ -221,6 +221,19 @@ public async Task> LoadManyAsync(TId[] ids, IMartenSession sess return (await _parent.LoadManyAsync(ids, session, token).ConfigureAwait(false)).OfType().ToList(); } + // #4667 Phase 2 — delegate projection loads to the parent hierarchy storage + // and downcast to the subclass like the session-aware path above. + public async Task LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token) + { + var doc = await _parent.LoadProjectedAsync(id, database, tenantId, token).ConfigureAwait(false); + return doc is T x ? x : default; + } + + public async Task> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token) + { + return (await _parent.LoadManyProjectedAsync(ids, database, tenantId, token).ConfigureAwait(false)).OfType().ToList(); + } + public TId AssignIdentity(T document, string tenantId, IMartenDatabase database) { return _parent.AssignIdentity(document, tenantId, database); diff --git a/src/Marten/Internal/ValueTypeIdentifiedDocumentStorage.cs b/src/Marten/Internal/ValueTypeIdentifiedDocumentStorage.cs index f63e660941..351e7af352 100644 --- a/src/Marten/Internal/ValueTypeIdentifiedDocumentStorage.cs +++ b/src/Marten/Internal/ValueTypeIdentifiedDocumentStorage.cs @@ -181,6 +181,13 @@ public IDeletion DeleteForId(TSimple id, string tenantId) public Task> LoadManyAsync(TSimple[] ids, IMartenSession session, CancellationToken token) => Inner.LoadManyAsync(ids.Select(_converter).ToArray(), session, token); + // #4667 Phase 2 — delegate to inner with the unwrapped id like the session-aware path. + public Task LoadProjectedAsync(TSimple id, IMartenDatabase database, string tenantId, CancellationToken token) + => Inner.LoadProjectedAsync(_converter(id), database, tenantId, token); + + public Task> LoadManyProjectedAsync(TSimple[] ids, IMartenDatabase database, string tenantId, CancellationToken token) + => Inner.LoadManyProjectedAsync(ids.Select(_converter).ToArray(), database, tenantId, token); + public TSimple AssignIdentity(TDoc document, string tenantId, IMartenDatabase database) => _unwrapper(Inner.AssignIdentity(document, tenantId, database));