diff --git a/src/DaemonTests/Bug_4667_projection_load_async_parallel_no_race.cs b/src/DaemonTests/Bug_4667_projection_load_async_parallel_no_race.cs new file mode 100644 index 0000000000..84de272597 --- /dev/null +++ b/src/DaemonTests/Bug_4667_projection_load_async_parallel_no_race.cs @@ -0,0 +1,121 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Events; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events.Aggregation; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace DaemonTests; + +/// +/// #4667 Phase 3 acceptance: a user-supplied aggregation projection whose Apply +/// reaches into from inside +/// the async daemon's 10-wide parallel Block<EventSliceExecution> +/// fan-out must not throw and must produce correct aggregates. Before Phase 3 +/// closed the user-code escape hatch, those LoadAsync calls per-row-wrote into +/// the shared session's Versions / ItemMap / ChangeTrackers +/// — a documented race source under the daemon's shared-session model. +/// +public class Bug_4667_projection_load_async_parallel_no_race: BugIntegrationContext +{ + [Fact] + public async Task user_load_async_inside_apply_under_parallel_daemon_fanout() + { + // 1000-stream rebuild against an aggregation projection whose Apply + // calls session.LoadAsync per event — large enough to push the + // daemon's Block(10, ...) fan-out across multiple parallel waves but + // small enough to keep the test under a minute. UseIdentityMapForAggregates + // is left at its default (false in the daemon path) — the path + // Phase 3 closes. + StoreOptions(opts => opts.Projections.Add(new OrderProjection(), ProjectionLifecycle.Async)); + + // Side document the projection's Apply reaches for. One per stream + // so every event hits a fresh LoadAsync — exercising the chokepoint + // on every iteration rather than re-using a cached identity-map hit. + const int streamCount = 250; + const int eventsPerStream = 4; + + var customerIds = Enumerable.Range(0, streamCount).Select(_ => Guid.NewGuid()).ToArray(); + await using (var session = theStore.LightweightSession()) + { + foreach (var id in customerIds) + { + session.Store(new Bug4667Customer { Id = id, Name = $"customer-{id:N}" }); + } + await session.SaveChangesAsync(); + } + + var streamIds = new Guid[streamCount]; + await using (var session = theStore.LightweightSession()) + { + for (var i = 0; i < streamCount; i++) + { + streamIds[i] = Guid.NewGuid(); + var customerId = customerIds[i]; + session.Events.StartStream( + streamIds[i], + Enumerable.Range(0, eventsPerStream) + .Select(j => (object)new Bug4667ItemPicked(customerId, j)) + .ToArray()); + } + await session.SaveChangesAsync(); + } + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.RebuildProjectionAsync(CancellationToken.None); + + await using var query = theStore.QuerySession(); + var aggregates = await query.LoadManyAsync(streamIds); + + aggregates.Count.ShouldBe(streamCount); + foreach (var agg in aggregates) + { + // The Apply set CustomerName from the side-loaded Bug4667Customer + // and incremented PickedCount per event. If the parallel race the + // chokepoint guards against fires, expect either a thrown + // CollectionMutated / KeyNotFound or an aggregate with missing + // CustomerName / wrong PickedCount. + agg.PickedCount.ShouldBe(eventsPerStream); + agg.CustomerName.ShouldNotBeNull(); + agg.CustomerName.ShouldStartWith("customer-"); + } + } +} + +public class Bug4667Order +{ + public Guid Id { get; set; } + public string CustomerName { get; set; } = string.Empty; + public int PickedCount { get; set; } +} + +public class Bug4667Customer +{ + public Guid Id { get; set; } + public string Name { get; set; } = string.Empty; +} + +public record Bug4667ItemPicked(Guid CustomerId, int Index); + +public partial class OrderProjection: SingleStreamProjection +{ + public async Task Apply(Bug4667ItemPicked @event, Bug4667Order order, IQuerySession session) + { + // The session here is the daemon's shared ProjectionDocumentSession + // for the (range, tenant). LoadAsync routes through + // the QuerySession.ExecuteLoadOneAsync chokepoint and, by Phase 3, + // dispatches to IDocumentStorage<,>.LoadProjectedAsync — bypassing + // every session-shared dictionary. + var customer = await session.LoadAsync(@event.CustomerId); + if (customer is not null) + { + order.CustomerName = customer.Name; + } + order.PickedCount++; + } +} diff --git a/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs b/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs index e206a16b04..421adbe53e 100644 --- a/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs +++ b/src/Marten/Events/Daemon/Internals/ProjectionDocumentSession.cs @@ -1,4 +1,7 @@ +#nullable enable using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; using JasperFx.Events.Daemon; @@ -71,4 +74,31 @@ public void AddTransactionParticipant(ITransactionParticipant participant) batch.AddTransactionParticipant(participant); } } + + /// + /// #4667 Phase 3 — close the user-code escape hatch. When user-supplied + /// operations.LoadAsync<X>(...) runs inside an aggregation + /// projection's EvolveAsync, route it through + /// so the + /// daemon's shared 10-wide parallel Block<EventSliceExecution> + /// workers never touch _session.Versions / _session.ItemMap / + /// _session.ChangeTrackers. The opt-in + /// + /// path falls through to the base session-aware route (the GH-3850 + /// inline-projection identity-map semantics; documented as not safe for + /// parallel projection workers). + /// + // Return is bare Task (with [return: MaybeNull] on the base) — see the + // chokepoint declaration in QuerySession.Load.cs for why. + [return: MaybeNull] + protected internal override Task ExecuteLoadOneAsync(IDocumentStorage storage, TId id, CancellationToken token) + => Options.EventGraph.UseIdentityMapForAggregates + ? base.ExecuteLoadOneAsync(storage, id, token) + : storage.LoadProjectedAsync(id, Database, TenantId, token)!; + + /// + protected internal override Task> ExecuteLoadManyAsync(IDocumentStorage storage, TId[] ids, CancellationToken token) + => Options.EventGraph.UseIdentityMapForAggregates + ? base.ExecuteLoadManyAsync(storage, ids, token) + : storage.LoadManyProjectedAsync(ids, Database, TenantId, token); } diff --git a/src/Marten/Internal/Sessions/QuerySession.Load.cs b/src/Marten/Internal/Sessions/QuerySession.Load.cs index 090a30b82f..7dc069e41d 100644 --- a/src/Marten/Internal/Sessions/QuerySession.Load.cs +++ b/src/Marten/Internal/Sessions/QuerySession.Load.cs @@ -2,13 +2,13 @@ using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Threading; using System.Threading.Tasks; using JasperFx.Core.Reflection; using Marten.Exceptions; using Marten.Internal.Storage; -using System.Diagnostics.CodeAnalysis; namespace Marten.Internal.Sessions; @@ -18,11 +18,42 @@ namespace Marten.Internal.Sessions; Justification = "Class-level: uses Type.MakeGenericType / MethodInfo.MakeGenericMethod / Activator.CreateInstance / FastExpressionCompiler — runtime code generation. AOT consumers pre-generate codegen artifacts (codegen write) and supply source-generator-backed serializer impls per the AOT publishing guide.")] public partial class QuerySession { + /// + /// Virtual chokepoint for the public + /// family. Default routes through + /// (session-aware: writes session-shared trackers per row). + /// overrides + /// to dispatch through + /// instead so user-supplied operations.LoadAsync<X>(...) calls from + /// inside an aggregation projection's EvolveAsync never touch + /// _session.Versions / _session.ItemMap / _session.ChangeTrackers + /// (#4667 Phase 3). + /// + /// + /// Uses [return: MaybeNull] + bare T rather than T? + /// because the override on 's descendants + /// (a partial-class chain not all in nullable-enable context) loses the + /// reference-type-vs-Nullable<T> disambiguation of T? at the + /// override site; the attribute form is unambiguous either way. + /// + [return: MaybeNull] + protected internal virtual Task ExecuteLoadOneAsync(IDocumentStorage storage, TId id, CancellationToken token) + where T : notnull where TId : notnull + => storage.LoadAsync(id, this, token)!; + + /// + /// Virtual chokepoint for the public + /// family. See . + /// + protected internal virtual Task> ExecuteLoadManyAsync(IDocumentStorage storage, TId[] ids, CancellationToken token) + where T : notnull where TId : notnull + => storage.LoadManyAsync(ids, this, token); + public async Task LoadAsync(string id, CancellationToken token = default) where T : notnull { assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false); - var document = await StorageFor().LoadAsync(id, this, token).ConfigureAwait(false); + var document = await ExecuteLoadOneAsync(StorageFor(), id, token).ConfigureAwait(false); return document; } @@ -50,7 +81,7 @@ private class Loader: ILoader { public async Task LoadAsync(object id, QuerySession session, CancellationToken token = default) where T : notnull { - var document = await session.StorageFor().LoadAsync((TId)id, session, token).ConfigureAwait(false); + var document = await session.ExecuteLoadOneAsync(session.StorageFor(), (TId)id, token).ConfigureAwait(false); return document; } @@ -64,8 +95,8 @@ private class Loader: ILoader var document = storage switch { - IDocumentStorage i => await i.LoadAsync(id, this, token).ConfigureAwait(false), - IDocumentStorage l => await l.LoadAsync(id, this, token).ConfigureAwait(false), + IDocumentStorage i => await ExecuteLoadOneAsync(i, id, token).ConfigureAwait(false), + IDocumentStorage l => await ExecuteLoadOneAsync(l, (long)id, token).ConfigureAwait(false), _ => throw new DocumentIdTypeMismatchException( $"The identity type for document type {typeof(T).FullNameInCode()} is not numeric") }; @@ -77,7 +108,7 @@ private class Loader: ILoader { assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false); - var document = await StorageFor().LoadAsync(id, this, token).ConfigureAwait(false); + var document = await ExecuteLoadOneAsync(StorageFor(), id, token).ConfigureAwait(false); return document; } @@ -86,7 +117,7 @@ private class Loader: ILoader { assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false); - var document = await StorageFor().LoadAsync(id, this, token).ConfigureAwait(false); + var document = await ExecuteLoadOneAsync(StorageFor(), id, token).ConfigureAwait(false); return document; } @@ -96,7 +127,7 @@ public async Task> LoadManyAsync(params string[] ids) where assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false); var documentStorage = StorageFor(); - return await documentStorage.LoadManyAsync(ids, this, default).ConfigureAwait(false); + return await ExecuteLoadManyAsync(documentStorage, ids, default).ConfigureAwait(false); } public async Task> LoadManyAsync(IEnumerable ids) where T : notnull @@ -104,7 +135,7 @@ public async Task> LoadManyAsync(IEnumerable ids) wh assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false); var documentStorage = StorageFor(); - return await documentStorage.LoadManyAsync(ids.ToArray(), this, default).ConfigureAwait(false); + return await ExecuteLoadManyAsync(documentStorage, ids.ToArray(), default).ConfigureAwait(false); } public async Task> LoadManyAsync(CancellationToken token, params string[] ids) where T : notnull @@ -112,7 +143,7 @@ public async Task> LoadManyAsync(CancellationToken token, pa assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false); var documentStorage = StorageFor(); - return await documentStorage.LoadManyAsync(ids, this, token).ConfigureAwait(false); + return await ExecuteLoadManyAsync(documentStorage, ids, token).ConfigureAwait(false); } public async Task> LoadManyAsync(CancellationToken token, IEnumerable ids) @@ -121,7 +152,7 @@ public async Task> LoadManyAsync(CancellationToken token, IE assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false); var documentStorage = StorageFor(); - return await documentStorage.LoadManyAsync(ids.ToArray(), this, token).ConfigureAwait(false); + return await ExecuteLoadManyAsync(documentStorage, ids.ToArray(), token).ConfigureAwait(false); } public Task> LoadManyAsync(params int[] ids) where T : notnull @@ -142,12 +173,12 @@ public async Task> LoadManyAsync(CancellationToken token, pa var storage = StorageFor(); if (storage is IDocumentStorage i) { - return await i.LoadManyAsync(ids, this, token).ConfigureAwait(false); + return await ExecuteLoadManyAsync(i, ids, token).ConfigureAwait(false); } if (storage is IDocumentStorage l) { - return await l.LoadManyAsync(ids.Select(x => (long)x).ToArray(), this, token).ConfigureAwait(false); + return await ExecuteLoadManyAsync(l, ids.Select(x => (long)x).ToArray(), token).ConfigureAwait(false); } @@ -165,7 +196,7 @@ public async Task> LoadManyAsync(params long[] ids) where T assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false); var documentStorage = StorageFor(); - return await documentStorage.LoadManyAsync(ids, this, default).ConfigureAwait(false); + return await ExecuteLoadManyAsync(documentStorage, ids, default).ConfigureAwait(false); } public async Task> LoadManyAsync(IEnumerable ids) where T : notnull @@ -173,7 +204,7 @@ public async Task> LoadManyAsync(IEnumerable ids) wher assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false); var documentStorage = StorageFor(); - return await documentStorage.LoadManyAsync(ids.ToArray(), this, default).ConfigureAwait(false); + return await ExecuteLoadManyAsync(documentStorage, ids.ToArray(), default).ConfigureAwait(false); } public async Task> LoadManyAsync(CancellationToken token, params long[] ids) where T : notnull @@ -181,7 +212,7 @@ public async Task> LoadManyAsync(CancellationToken token, pa assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false); var documentStorage = StorageFor(); - return await documentStorage.LoadManyAsync(ids, this, token).ConfigureAwait(false); + return await ExecuteLoadManyAsync(documentStorage, ids, token).ConfigureAwait(false); } public async Task> LoadManyAsync(CancellationToken token, IEnumerable ids) @@ -190,7 +221,7 @@ public async Task> LoadManyAsync(CancellationToken token, IE assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false); var documentStorage = StorageFor(); - return await documentStorage.LoadManyAsync(ids.ToArray(), this, token).ConfigureAwait(false); + return await ExecuteLoadManyAsync(documentStorage, ids.ToArray(), token).ConfigureAwait(false); } public async Task> LoadManyAsync(params Guid[] ids) where T : notnull @@ -198,21 +229,21 @@ public async Task> LoadManyAsync(params Guid[] ids) where T assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false); var documentStorage = StorageFor(); - return await documentStorage.LoadManyAsync(ids, this, default).ConfigureAwait(false); + return await ExecuteLoadManyAsync(documentStorage, ids, default).ConfigureAwait(false); } public async Task> LoadManyAsync(IEnumerable ids) where T : notnull { assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T)).ConfigureAwait(false); - return await StorageFor().LoadManyAsync(ids.ToArray(), this, default).ConfigureAwait(false); + return await ExecuteLoadManyAsync(StorageFor(), ids.ToArray(), default).ConfigureAwait(false); } public async Task> LoadManyAsync(CancellationToken token, params Guid[] ids) where T : notnull { assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false); - return await StorageFor().LoadManyAsync(ids, this, token).ConfigureAwait(false); + return await ExecuteLoadManyAsync(StorageFor(), ids, token).ConfigureAwait(false); } public async Task> LoadManyAsync(CancellationToken token, IEnumerable ids) @@ -220,6 +251,6 @@ public async Task> LoadManyAsync(CancellationToken token, IE { assertNotDisposed(); await Database.EnsureStorageExistsAsync(typeof(T), token).ConfigureAwait(false); - return await StorageFor().LoadManyAsync(ids.ToArray(), this, token).ConfigureAwait(false); + return await ExecuteLoadManyAsync(StorageFor(), ids.ToArray(), token).ConfigureAwait(false); } }