Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions src/DaemonTests/Bug_4667_projection_load_async_parallel_no_race.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Events;
using JasperFx.Events.Projections;
using Marten;
using Marten.Events.Aggregation;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace DaemonTests;

/// <summary>
/// #4667 Phase 3 acceptance: a user-supplied aggregation projection whose Apply
/// reaches into <see cref="Marten.IDocumentOperations.LoadAsync{T}"/> from inside
/// the async daemon's 10-wide parallel <c>Block&lt;EventSliceExecution&gt;</c>
/// fan-out must not throw and must produce correct aggregates. Before Phase 3
/// closed the user-code escape hatch, those LoadAsync calls per-row-wrote into
/// the shared session's <c>Versions</c> / <c>ItemMap</c> / <c>ChangeTrackers</c>
/// — a documented race source under the daemon's shared-session model.
/// </summary>
public class Bug_4667_projection_load_async_parallel_no_race: BugIntegrationContext
{
[Fact]
public async Task user_load_async_inside_apply_under_parallel_daemon_fanout()
{
// 1000-stream rebuild against an aggregation projection whose Apply
// calls session.LoadAsync<User> per event — large enough to push the
// daemon's Block(10, ...) fan-out across multiple parallel waves but
// small enough to keep the test under a minute. UseIdentityMapForAggregates
// is left at its default (false in the daemon path) — the path
// Phase 3 closes.
StoreOptions(opts => opts.Projections.Add(new OrderProjection(), ProjectionLifecycle.Async));

// Side document the projection's Apply reaches for. One per stream
// so every event hits a fresh LoadAsync — exercising the chokepoint
// on every iteration rather than re-using a cached identity-map hit.
const int streamCount = 250;
const int eventsPerStream = 4;

var customerIds = Enumerable.Range(0, streamCount).Select(_ => Guid.NewGuid()).ToArray();
await using (var session = theStore.LightweightSession())
{
foreach (var id in customerIds)
{
session.Store(new Bug4667Customer { Id = id, Name = $"customer-{id:N}" });
}
await session.SaveChangesAsync();
}

var streamIds = new Guid[streamCount];
await using (var session = theStore.LightweightSession())
{
for (var i = 0; i < streamCount; i++)
{
streamIds[i] = Guid.NewGuid();
var customerId = customerIds[i];
session.Events.StartStream<Bug4667Order>(
streamIds[i],
Enumerable.Range(0, eventsPerStream)
.Select(j => (object)new Bug4667ItemPicked(customerId, j))
.ToArray());
}
await session.SaveChangesAsync();
}

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.RebuildProjectionAsync<Bug4667Order>(CancellationToken.None);

await using var query = theStore.QuerySession();
var aggregates = await query.LoadManyAsync<Bug4667Order>(streamIds);

aggregates.Count.ShouldBe(streamCount);
foreach (var agg in aggregates)
{
// The Apply set CustomerName from the side-loaded Bug4667Customer
// and incremented PickedCount per event. If the parallel race the
// chokepoint guards against fires, expect either a thrown
// CollectionMutated / KeyNotFound or an aggregate with missing
// CustomerName / wrong PickedCount.
agg.PickedCount.ShouldBe(eventsPerStream);
agg.CustomerName.ShouldNotBeNull();
agg.CustomerName.ShouldStartWith("customer-");
}
}
}

public class Bug4667Order
{
public Guid Id { get; set; }
public string CustomerName { get; set; } = string.Empty;
public int PickedCount { get; set; }
}

public class Bug4667Customer
{
public Guid Id { get; set; }
public string Name { get; set; } = string.Empty;
}

public record Bug4667ItemPicked(Guid CustomerId, int Index);

public partial class OrderProjection: SingleStreamProjection<Bug4667Order, Guid>
{
public async Task Apply(Bug4667ItemPicked @event, Bug4667Order order, IQuerySession session)
{
// The session here is the daemon's shared ProjectionDocumentSession
// for the (range, tenant). LoadAsync<Bug4667Customer> routes through
// the QuerySession.ExecuteLoadOneAsync chokepoint and, by Phase 3,
// dispatches to IDocumentStorage<,>.LoadProjectedAsync — bypassing
// every session-shared dictionary.
var customer = await session.LoadAsync<Bug4667Customer>(@event.CustomerId);
if (customer is not null)
{
order.CustomerName = customer.Name;
}
order.PickedCount++;
}
}
30 changes: 30 additions & 0 deletions src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Events.Daemon;
Expand Down Expand Up @@ -71,4 +74,31 @@ public void AddTransactionParticipant(ITransactionParticipant participant)
batch.AddTransactionParticipant(participant);
}
}

/// <summary>
/// #4667 Phase 3 — close the user-code escape hatch. When user-supplied
/// <c>operations.LoadAsync&lt;X&gt;(...)</c> runs inside an aggregation
/// projection's EvolveAsync, route it through
/// <see cref="IDocumentStorage{T, TId}.LoadProjectedAsync"/> so the
/// daemon's shared 10-wide parallel <c>Block&lt;EventSliceExecution&gt;</c>
/// workers never touch <c>_session.Versions</c> / <c>_session.ItemMap</c> /
/// <c>_session.ChangeTrackers</c>. The opt-in
/// <see cref="Marten.Events.EventGraph.UseIdentityMapForAggregates"/>
/// path falls through to the base session-aware route (the GH-3850
/// inline-projection identity-map semantics; documented as not safe for
/// parallel projection workers).
/// </summary>
// Return is bare Task<T> (with [return: MaybeNull] on the base) — see the
// chokepoint declaration in QuerySession.Load.cs for why.
[return: MaybeNull]
protected internal override Task<T> ExecuteLoadOneAsync<T, TId>(IDocumentStorage<T, TId> storage, TId id, CancellationToken token)
=> Options.EventGraph.UseIdentityMapForAggregates
? base.ExecuteLoadOneAsync(storage, id, token)
: storage.LoadProjectedAsync(id, Database, TenantId, token)!;

/// <inheritdoc cref="ExecuteLoadOneAsync{T, TId}"/>
protected internal override Task<IReadOnlyList<T>> ExecuteLoadManyAsync<T, TId>(IDocumentStorage<T, TId> storage, TId[] ids, CancellationToken token)
=> Options.EventGraph.UseIdentityMapForAggregates
? base.ExecuteLoadManyAsync(storage, ids, token)
: storage.LoadManyProjectedAsync(ids, Database, TenantId, token);
}
73 changes: 52 additions & 21 deletions src/Marten/Internal/Sessions/QuerySession.Load.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core.Reflection;
using Marten.Exceptions;
using Marten.Internal.Storage;
using System.Diagnostics.CodeAnalysis;

namespace Marten.Internal.Sessions;

Expand All @@ -18,11 +18,42 @@ namespace Marten.Internal.Sessions;
Justification = "Class-level: uses Type.MakeGenericType / MethodInfo.MakeGenericMethod / Activator.CreateInstance / FastExpressionCompiler — runtime code generation. AOT consumers pre-generate codegen artifacts (codegen write) and supply source-generator-backed serializer impls per the AOT publishing guide.")]
public partial class QuerySession
{
/// <summary>
/// Virtual chokepoint for the public <see cref="LoadAsync{T}(Guid, CancellationToken)"/>
/// family. Default routes through <see cref="IDocumentStorage{T, TId}.LoadAsync"/>
/// (session-aware: writes session-shared trackers per row).
/// <see cref="Marten.Events.Daemon.Internals.ProjectionDocumentSession"/> overrides
/// to dispatch through <see cref="IDocumentStorage{T, TId}.LoadProjectedAsync"/>
/// instead so user-supplied <c>operations.LoadAsync&lt;X&gt;(...)</c> calls from
/// inside an aggregation projection's EvolveAsync never touch
/// <c>_session.Versions</c> / <c>_session.ItemMap</c> / <c>_session.ChangeTrackers</c>
/// (#4667 Phase 3).
/// </summary>
/// <remarks>
/// Uses <c>[return: MaybeNull]</c> + bare <c>T</c> rather than <c>T?</c>
/// because the override on <see cref="DocumentSessionBase"/>'s descendants
/// (a partial-class chain not all in nullable-enable context) loses the
/// reference-type-vs-Nullable&lt;T&gt; disambiguation of <c>T?</c> at the
/// override site; the attribute form is unambiguous either way.
/// </remarks>
[return: MaybeNull]
protected internal virtual Task<T> ExecuteLoadOneAsync<T, TId>(IDocumentStorage<T, TId> storage, TId id, CancellationToken token)
where T : notnull where TId : notnull
=> storage.LoadAsync(id, this, token)!;

/// <summary>
/// Virtual chokepoint for the public <see cref="LoadManyAsync{T}(Guid[])"/>
/// family. See <see cref="ExecuteLoadOneAsync{T, TId}"/>.
/// </summary>
protected internal virtual Task<IReadOnlyList<T>> ExecuteLoadManyAsync<T, TId>(IDocumentStorage<T, TId> storage, TId[] ids, CancellationToken token)
where T : notnull where TId : notnull
=> storage.LoadManyAsync(ids, this, token);

public async Task<T?> LoadAsync<T>(string id, CancellationToken token = default) where T : notnull
{
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false);
var document = await StorageFor<T, string>().LoadAsync(id, this, token).ConfigureAwait(false);
var document = await ExecuteLoadOneAsync(StorageFor<T, string>(), id, token).ConfigureAwait(false);

return document;
}
Expand Down Expand Up @@ -50,7 +81,7 @@ private class Loader<TId>: ILoader
{
public async Task<T?> LoadAsync<T>(object id, QuerySession session, CancellationToken token = default) where T : notnull
{
var document = await session.StorageFor<T, TId>().LoadAsync((TId)id, session, token).ConfigureAwait(false);
var document = await session.ExecuteLoadOneAsync(session.StorageFor<T, TId>(), (TId)id, token).ConfigureAwait(false);

return document;
}
Expand All @@ -64,8 +95,8 @@ private class Loader<TId>: ILoader

var document = storage switch
{
IDocumentStorage<T, int> i => await i.LoadAsync(id, this, token).ConfigureAwait(false),
IDocumentStorage<T, long> l => await l.LoadAsync(id, this, token).ConfigureAwait(false),
IDocumentStorage<T, int> i => await ExecuteLoadOneAsync(i, id, token).ConfigureAwait(false),
IDocumentStorage<T, long> l => await ExecuteLoadOneAsync(l, (long)id, token).ConfigureAwait(false),
_ => throw new DocumentIdTypeMismatchException(
$"The identity type for document type {typeof(T).FullNameInCode()} is not numeric")
};
Expand All @@ -77,7 +108,7 @@ private class Loader<TId>: ILoader
{
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false);
var document = await StorageFor<T, long>().LoadAsync(id, this, token).ConfigureAwait(false);
var document = await ExecuteLoadOneAsync(StorageFor<T, long>(), id, token).ConfigureAwait(false);

return document;
}
Expand All @@ -86,7 +117,7 @@ private class Loader<TId>: ILoader
{
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false);
var document = await StorageFor<T, Guid>().LoadAsync(id, this, token).ConfigureAwait(false);
var document = await ExecuteLoadOneAsync(StorageFor<T, Guid>(), id, token).ConfigureAwait(false);

return document;
}
Expand All @@ -96,23 +127,23 @@ public async Task<IReadOnlyList<T>> LoadManyAsync<T>(params string[] ids) where
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false);
var documentStorage = StorageFor<T, string>();
return await documentStorage.LoadManyAsync(ids, this, default).ConfigureAwait(false);
return await ExecuteLoadManyAsync(documentStorage, ids, default).ConfigureAwait(false);
}

public async Task<IReadOnlyList<T>> LoadManyAsync<T>(IEnumerable<string> ids) where T : notnull
{
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false);
var documentStorage = StorageFor<T, string>();
return await documentStorage.LoadManyAsync(ids.ToArray(), this, default).ConfigureAwait(false);
return await ExecuteLoadManyAsync(documentStorage, ids.ToArray(), default).ConfigureAwait(false);
}

public async Task<IReadOnlyList<T>> LoadManyAsync<T>(CancellationToken token, params string[] ids) where T : notnull
{
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false);
var documentStorage = StorageFor<T, string>();
return await documentStorage.LoadManyAsync(ids, this, token).ConfigureAwait(false);
return await ExecuteLoadManyAsync(documentStorage, ids, token).ConfigureAwait(false);
}

public async Task<IReadOnlyList<T>> LoadManyAsync<T>(CancellationToken token, IEnumerable<string> ids)
Expand All @@ -121,7 +152,7 @@ public async Task<IReadOnlyList<T>> LoadManyAsync<T>(CancellationToken token, IE
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false);
var documentStorage = StorageFor<T, string>();
return await documentStorage.LoadManyAsync(ids.ToArray(), this, token).ConfigureAwait(false);
return await ExecuteLoadManyAsync(documentStorage, ids.ToArray(), token).ConfigureAwait(false);
}

public Task<IReadOnlyList<T>> LoadManyAsync<T>(params int[] ids) where T : notnull
Expand All @@ -142,12 +173,12 @@ public async Task<IReadOnlyList<T>> LoadManyAsync<T>(CancellationToken token, pa
var storage = StorageFor<T>();
if (storage is IDocumentStorage<T, int> i)
{
return await i.LoadManyAsync(ids, this, token).ConfigureAwait(false);
return await ExecuteLoadManyAsync(i, ids, token).ConfigureAwait(false);
}

if (storage is IDocumentStorage<T, long> l)
{
return await l.LoadManyAsync(ids.Select(x => (long)x).ToArray(), this, token).ConfigureAwait(false);
return await ExecuteLoadManyAsync(l, ids.Select(x => (long)x).ToArray(), token).ConfigureAwait(false);
}


Expand All @@ -165,23 +196,23 @@ public async Task<IReadOnlyList<T>> LoadManyAsync<T>(params long[] ids) where T
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false);
var documentStorage = StorageFor<T, long>();
return await documentStorage.LoadManyAsync(ids, this, default).ConfigureAwait(false);
return await ExecuteLoadManyAsync(documentStorage, ids, default).ConfigureAwait(false);
}

public async Task<IReadOnlyList<T>> LoadManyAsync<T>(IEnumerable<long> ids) where T : notnull
{
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false);
var documentStorage = StorageFor<T, long>();
return await documentStorage.LoadManyAsync(ids.ToArray(), this, default).ConfigureAwait(false);
return await ExecuteLoadManyAsync(documentStorage, ids.ToArray(), default).ConfigureAwait(false);
}

public async Task<IReadOnlyList<T>> LoadManyAsync<T>(CancellationToken token, params long[] ids) where T : notnull
{
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false);
var documentStorage = StorageFor<T, long>();
return await documentStorage.LoadManyAsync(ids, this, token).ConfigureAwait(false);
return await ExecuteLoadManyAsync(documentStorage, ids, token).ConfigureAwait(false);
}

public async Task<IReadOnlyList<T>> LoadManyAsync<T>(CancellationToken token, IEnumerable<long> ids)
Expand All @@ -190,36 +221,36 @@ public async Task<IReadOnlyList<T>> LoadManyAsync<T>(CancellationToken token, IE
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false);
var documentStorage = StorageFor<T, long>();
return await documentStorage.LoadManyAsync(ids.ToArray(), this, token).ConfigureAwait(false);
return await ExecuteLoadManyAsync(documentStorage, ids.ToArray(), token).ConfigureAwait(false);
}

public async Task<IReadOnlyList<T>> LoadManyAsync<T>(params Guid[] ids) where T : notnull
{
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false);
var documentStorage = StorageFor<T, Guid>();
return await documentStorage.LoadManyAsync(ids, this, default).ConfigureAwait(false);
return await ExecuteLoadManyAsync(documentStorage, ids, default).ConfigureAwait(false);
}

public async Task<IReadOnlyList<T>> LoadManyAsync<T>(IEnumerable<Guid> ids) where T : notnull
{
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false);
return await StorageFor<T, Guid>().LoadManyAsync(ids.ToArray(), this, default).ConfigureAwait(false);
return await ExecuteLoadManyAsync(StorageFor<T, Guid>(), ids.ToArray(), default).ConfigureAwait(false);
}

public async Task<IReadOnlyList<T>> LoadManyAsync<T>(CancellationToken token, params Guid[] ids) where T : notnull
{
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false);
return await StorageFor<T, Guid>().LoadManyAsync(ids, this, token).ConfigureAwait(false);
return await ExecuteLoadManyAsync(StorageFor<T, Guid>(), ids, token).ConfigureAwait(false);
}

public async Task<IReadOnlyList<T>> LoadManyAsync<T>(CancellationToken token, IEnumerable<Guid> ids)
where T : notnull
{
assertNotDisposed();
await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false);
return await StorageFor<T, Guid>().LoadManyAsync(ids.ToArray(), this, token).ConfigureAwait(false);
return await ExecuteLoadManyAsync(StorageFor<T, Guid>(), ids.ToArray(), token).ConfigureAwait(false);
}
}
Loading