Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions src/Marten/Internal/ClosedShape/ClosedShapeProjectionLoader.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using Marten.Schema;
using Marten.Storage;
using Npgsql;

namespace Marten.Internal.ClosedShape;

/// <summary>
/// Shared projection-safe load implementation for closed-shape document
/// storages (#4667 Phase 2). Opens a fresh connection from the
/// <see cref="IMartenDatabase"/>, executes the load SQL, and deserializes
/// the data column directly via <see cref="ISerializer"/> — without any
/// <see cref="IMartenSession"/> reference, without writing to
/// <c>VersionTracker</c> / <c>ItemMap</c> / <c>ChangeTrackers</c>, and
/// without calling <c>MarkAsDocumentLoaded</c>.
/// </summary>
/// <remarks>
/// <para>
/// The projection-safe selector path intentionally skips metadata binders
/// (CreatedAt / LastModified / Headers / etc.). Projections care about the
/// aggregate state encoded in the data column for their Apply/Evolve hot
/// path; per-row metadata is not part of that contract. If a future
/// projection scenario needs metadata it can be added here as a focused
/// follow-up.
/// </para>
/// <para>
/// Hierarchical storages dispatch deserialization through the
/// <see cref="DocumentMapping.TypeFor"/> alias-to-.NET-type lookup,
/// mirroring <see cref="HierarchicalClosedShapeQueryOnlySelector{T,TId}"/>.
/// </para>
/// </remarks>
internal static class ClosedShapeProjectionLoader<TDoc, TId>
where TDoc : notnull
where TId : notnull
{
// Column layout matches the writeable closed-shape selectors
// (Lightweight / IdentityMap / DirtyChecked) since LoadProjectedAsync is
// only reached from those storages — see
// <see cref="ClosedShapeLightweightSelector{T,TId}"/>. QueryOnly storage
// has a different layout (id excluded, data at col 0) but doesn't
// implement LoadProjectedAsync; it throws NotSupportedException instead.
private const int IdColumn = 0;
private const int DataColumn = 1;
private const int FirstMetadataColumn = 2;

public static async Task<TDoc?> LoadAsync(
NpgsqlCommand command,
DocumentStorageDescriptor<TDoc, TId> descriptor,
ISerializer serializer,
IMartenDatabase database,
CancellationToken token)
{
await using var conn = database.CreateConnection();
await conn.OpenAsync(token).ConfigureAwait(false);
try
{
command.Connection = conn;
await using var reader = await command.ExecuteReaderAsync(token).ConfigureAwait(false);
if (!await reader.ReadAsync(token).ConfigureAwait(false))
{
return default;
}

return await readOneAsync(reader, descriptor, serializer, token).ConfigureAwait(false);
}
finally
{
await conn.CloseAsync().ConfigureAwait(false);
}
}

public static async Task<IReadOnlyList<TDoc>> LoadManyAsync(
NpgsqlCommand command,
DocumentStorageDescriptor<TDoc, TId> descriptor,
ISerializer serializer,
IMartenDatabase database,
CancellationToken token)
{
await using var conn = database.CreateConnection();
await conn.OpenAsync(token).ConfigureAwait(false);
try
{
command.Connection = conn;
await using var reader = await command.ExecuteReaderAsync(token).ConfigureAwait(false);
var list = new List<TDoc>();
while (await reader.ReadAsync(token).ConfigureAwait(false))
{
var doc = await readOneAsync(reader, descriptor, serializer, token).ConfigureAwait(false);
if (doc is not null)
{
list.Add(doc);
}
}
return list;
}
finally
{
await conn.CloseAsync().ConfigureAwait(false);
}
}

private static async ValueTask<TDoc> readOneAsync(
DbDataReader reader,
DocumentStorageDescriptor<TDoc, TId> descriptor,
ISerializer serializer,
CancellationToken token)
{
// Hierarchical: dispatch via mt_doc_type alias just like
// HierarchicalClosedShapeQueryOnlySelector. Flat: straight deserialize.
if (descriptor.HierarchyMapping is { } hierarchy)
{
var docTypeOrdinal = FirstMetadataColumn + descriptor.DocTypeReadIndex;
var alias = await reader.GetFieldValueAsync<string>(docTypeOrdinal, token).ConfigureAwait(false);
return (TDoc)await serializer.FromJsonAsync(hierarchy.TypeFor(alias), reader, DataColumn, token).ConfigureAwait(false);
}

return await serializer.FromJsonAsync<TDoc>(reader, DataColumn, token).ConfigureAwait(false);
}
}
12 changes: 12 additions & 0 deletions src/Marten/Internal/ClosedShape/DirtyCheckedClosedShapeStorage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#nullable enable
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Marten.Internal;
using Marten.Internal.Operations;
using Marten.Internal.Storage;
Expand Down Expand Up @@ -37,4 +40,13 @@ public override object RawIdentityValue(TId id)

public override Npgsql.NpgsqlParameter BuildManyIdParameter(TId[] ids)
=> ClosedShapeIdHelpers.BuildManyIdParameter(ids, _descriptor.Identification);

// #4667 Phase 2 — session-free projection load; see Lightweight peer.
public override Task<TDoc?> LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token)
=> ClosedShapeProjectionLoader<TDoc, TId>.LoadAsync(
BuildLoadCommand(id, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token);

public override Task<IReadOnlyList<TDoc>> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token)
=> ClosedShapeProjectionLoader<TDoc, TId>.LoadManyAsync(
BuildLoadManyCommand(ids, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token);
}
12 changes: 12 additions & 0 deletions src/Marten/Internal/ClosedShape/IdentityMapClosedShapeStorage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#nullable enable
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Marten.Internal;
using Marten.Internal.Operations;
using Marten.Internal.Storage;
Expand Down Expand Up @@ -37,4 +40,13 @@ public override object RawIdentityValue(TId id)

public override Npgsql.NpgsqlParameter BuildManyIdParameter(TId[] ids)
=> ClosedShapeIdHelpers.BuildManyIdParameter(ids, _descriptor.Identification);

// #4667 Phase 2 — session-free projection load; see Lightweight peer.
public override Task<TDoc?> LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token)
=> ClosedShapeProjectionLoader<TDoc, TId>.LoadAsync(
BuildLoadCommand(id, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token);

public override Task<IReadOnlyList<TDoc>> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token)
=> ClosedShapeProjectionLoader<TDoc, TId>.LoadManyAsync(
BuildLoadManyCommand(ids, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token);
}
15 changes: 15 additions & 0 deletions src/Marten/Internal/ClosedShape/LightweightClosedShapeStorage.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#nullable enable
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Marten.Internal;
using Marten.Internal.Operations;
using Marten.Internal.Storage;
Expand Down Expand Up @@ -52,4 +55,16 @@ public override object RawIdentityValue(TId id)

public override Npgsql.NpgsqlParameter BuildManyIdParameter(TId[] ids)
=> ClosedShapeIdHelpers.BuildManyIdParameter(ids, _descriptor.Identification);

// #4667 Phase 2 — session-free projection load. Opens a fresh connection
// from the supplied database and deserializes the data column directly,
// bypassing the session-aware BuildSelector path that writes versions /
// ItemMap / ChangeTrackers per row. Shared with IdentityMap / DirtyChecked.
public override Task<TDoc?> LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token)
=> ClosedShapeProjectionLoader<TDoc, TId>.LoadAsync(
BuildLoadCommand(id, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token);

public override Task<IReadOnlyList<TDoc>> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token)
=> ClosedShapeProjectionLoader<TDoc, TId>.LoadManyAsync(
BuildLoadManyCommand(ids, tenantId), _descriptor, _mapping.StoreOptions.Serializer(), database, token);
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ public override IStorageOperation InsertProjected(TDoc document, string tenant)
public override IStorageOperation UpdateProjected(TDoc document, string tenant)
=> throw new NotSupportedException("QueryOnly storage doesn't support UpdateProjected.");

// #4667 Phase 2 — QueryOnly storages aren't used by the projection read
// path; ProjectionStorage holds a writeable storage instance for the
// projected document type, not a QueryOnly one.
public override System.Threading.Tasks.Task<TDoc?> LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, System.Threading.CancellationToken token)
=> throw new NotSupportedException("QueryOnly storage doesn't support LoadProjectedAsync.");

public override System.Threading.Tasks.Task<System.Collections.Generic.IReadOnlyList<TDoc>> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, System.Threading.CancellationToken token)
=> throw new NotSupportedException("QueryOnly storage doesn't support LoadManyProjectedAsync.");

public override ISelector BuildSelector(IMartenSession session)
// #4659 Phase 2: pick the Flat / Hierarchical selector ONCE per
// query — neither selector class branches on HierarchyMapping per
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,17 @@ public void Delete(TId identity, string tenantId)

public async Task<IReadOnlyDictionary<TId, TDoc>> LoadManyAsync(TId[] identities, CancellationToken cancellationToken)
{
var docs = await _storage.LoadManyAsync(identities, _session, cancellationToken).ConfigureAwait(false);
// #4667 Phase 2 — default (UseIdentityMapForAggregates = false) routes
// through LoadManyProjectedAsync, which opens a fresh connection and
// uses a tracker-free deserialization path so the async daemon's
// parallel slice workers never touch _session.Versions /
// _session.ItemMap / _session.ChangeTrackers. The opt-in (true) case
// preserves the inline-projection identity-map semantics by falling
// through to the session-aware LoadManyAsync — at the cost of the
// race the flag's design note already documents.
var docs = _session.Options.EventGraph.UseIdentityMapForAggregates
? await _storage.LoadManyAsync(identities, _session, cancellationToken).ConfigureAwait(false)
: await _storage.LoadManyProjectedAsync(identities, _session.Database, TenantId, cancellationToken).ConfigureAwait(false);
return docs.ToDictionary(doc => _storage.Identity(doc));
}

Expand Down Expand Up @@ -240,6 +250,9 @@ private Func<TId, ArchiveStreamOperation> archiveOperationBuilderFor<TId>()
//TODO fix in IProjectionStorage
public Task<TDoc?> LoadAsync(TId id, CancellationToken cancellation)
{
return _storage.LoadAsync(id, _session, cancellation);
// #4667 Phase 2 — see LoadManyAsync above for rationale.
return _session.Options.EventGraph.UseIdentityMapForAggregates
? _storage.LoadAsync(id, _session, cancellation)
: _storage.LoadProjectedAsync(id, _session.Database, TenantId, cancellation);
}
}
16 changes: 16 additions & 0 deletions src/Marten/Internal/Storage/DocumentStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,22 @@ public ISqlFragment FilterDocuments(ISqlFragment query, IMartenSession session)

public abstract Task<IReadOnlyList<T>> LoadManyAsync(TId[] ids, IMartenSession session, CancellationToken token);

/// <inheritdoc />
/// <remarks>
/// Default impl throws — closed-shape storages (the projection-eligible
/// path) override with a fresh-connection read that bypasses session-shared
/// trackers. Non-closed-shape storages aren't reachable via the projection
/// write path and don't need this overload.
/// </remarks>
public virtual Task<T?> LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token)
=> throw new NotSupportedException(
$"{GetType().Name} doesn't implement LoadProjectedAsync. Closed-shape storage variants provide this for the async-daemon projection-safe read path (#4667 Phase 2).");

/// <inheritdoc />
public virtual Task<IReadOnlyList<T>> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token)
=> throw new NotSupportedException(
$"{GetType().Name} doesn't implement LoadManyProjectedAsync. Closed-shape storage variants provide this for the async-daemon projection-safe read path (#4667 Phase 2).");

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public abstract TId Identity(T document);

Expand Down
17 changes: 17 additions & 0 deletions src/Marten/Internal/Storage/IDocumentStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,23 @@ public interface IDocumentStorage<T, TId>: IDocumentStorage<T>, IIdentitySetter<

Task<IReadOnlyList<T>> LoadManyAsync(TId[] ids, IMartenSession session, CancellationToken token);

/// <summary>
/// Session-free Load for projection storage (#4667 Phase 2). Opens a fresh
/// connection from the database, executes the load SQL, and returns the
/// deserialized document. Does not touch any session-shared state — no
/// version/revision tracker writes, no ItemMap updates, no
/// <c>MarkAsDocumentLoaded</c>, no <c>ChangeTrackers</c> writes. Safe to call
/// from parallel async-daemon slice handlers that share an
/// <see cref="IMartenSession"/>.
/// </summary>
Task<T?> LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token);

/// <summary>
/// Session-free LoadMany for projection storage (#4667 Phase 2). See
/// <see cref="LoadProjectedAsync"/>.
/// </summary>
Task<IReadOnlyList<T>> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token);


TId AssignIdentity(T document, string tenantId, IMartenDatabase database);
ISqlFragment ByIdFilter(TId id);
Expand Down
13 changes: 13 additions & 0 deletions src/Marten/Internal/Storage/SubClassDocumentStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,19 @@ public async Task<IReadOnlyList<T>> LoadManyAsync(TId[] ids, IMartenSession sess
return (await _parent.LoadManyAsync(ids, session, token).ConfigureAwait(false)).OfType<T>().ToList();
}

// #4667 Phase 2 — delegate projection loads to the parent hierarchy storage
// and downcast to the subclass like the session-aware path above.
public async Task<T?> LoadProjectedAsync(TId id, IMartenDatabase database, string tenantId, CancellationToken token)
{
var doc = await _parent.LoadProjectedAsync(id, database, tenantId, token).ConfigureAwait(false);
return doc is T x ? x : default;
}

public async Task<IReadOnlyList<T>> LoadManyProjectedAsync(TId[] ids, IMartenDatabase database, string tenantId, CancellationToken token)
{
return (await _parent.LoadManyProjectedAsync(ids, database, tenantId, token).ConfigureAwait(false)).OfType<T>().ToList();
}

public TId AssignIdentity(T document, string tenantId, IMartenDatabase database)
{
return _parent.AssignIdentity(document, tenantId, database);
Expand Down
7 changes: 7 additions & 0 deletions src/Marten/Internal/ValueTypeIdentifiedDocumentStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ public IDeletion DeleteForId(TSimple id, string tenantId)
public Task<IReadOnlyList<TDoc>> LoadManyAsync(TSimple[] ids, IMartenSession session, CancellationToken token)
=> Inner.LoadManyAsync(ids.Select(_converter).ToArray(), session, token);

// #4667 Phase 2 — delegate to inner with the unwrapped id like the session-aware path.
public Task<TDoc?> LoadProjectedAsync(TSimple id, IMartenDatabase database, string tenantId, CancellationToken token)
=> Inner.LoadProjectedAsync(_converter(id), database, tenantId, token);

public Task<IReadOnlyList<TDoc>> LoadManyProjectedAsync(TSimple[] ids, IMartenDatabase database, string tenantId, CancellationToken token)
=> Inner.LoadManyProjectedAsync(ids.Select(_converter).ToArray(), database, tenantId, token);

public TSimple AssignIdentity(TDoc document, string tenantId, IMartenDatabase database)
=> _unwrapper(Inner.AssignIdentity(document, tenantId, database));

Expand Down
Loading