diff --git a/src/EventSourcingTests/Bugs/Bug_3661_await_custom_projection_slicing.cs b/src/EventSourcingTests/Bugs/Bug_3661_await_custom_projection_slicing.cs new file mode 100644 index 0000000000..ceca0afe40 --- /dev/null +++ b/src/EventSourcingTests/Bugs/Bug_3661_await_custom_projection_slicing.cs @@ -0,0 +1,75 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventSourcingTests.Aggregation; +using EventSourcingTests.Examples; +using JasperFx.Core; +using Marten; +using Marten.Events; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Marten.Internal.Sessions; +using Marten.Storage; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Bugs; + +public class Bug_3661_await_custom_projection_slicing : OneOffConfigurationsContext +{ + [Fact] + public async Task fetching_multiple_items_from_slicers_in_async_custom_projection() + { + StoreOptions(opts => opts.Projections.Add(new StartAndStopIteratingAwaitablesSlicedProjection(), ProjectionLifecycle.Async)); + + var stream = Guid.NewGuid(); + theSession.Store(new Document1 { Id = stream }); + theSession.Events.StartStream(stream, new Start(), new Increment(), new Increment()); + + var stream2 = Guid.NewGuid(); + theSession.Store(new Document1 { Id = stream2 }); + theSession.Events.StartStream(stream2, new Start(), new Increment(), new Increment()); + await theSession.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(20.Seconds()); + + var aggregate = await theSession.LoadAsync(stream); + aggregate.Count.ShouldBe(2); + var aggregate2 = await theSession.LoadAsync(stream2); + aggregate2.Count.ShouldBe(2); + } +} + +public class StartAndStopIteratingAwaitablesSlicedProjection: CustomProjection, IEventSlicer +{ + public StartAndStopIteratingAwaitablesSlicedProjection() + { + UseCustomSlicer(this); + IncludeType(); + IncludeType(); + } + + public override ValueTask ApplyChangesAsync(DocumentSessionBase session, + EventSlice slice, CancellationToken cancellation, + ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline) => + new StartAndStopProjection().ApplyChangesAsync(session, slice, cancellation, lifecycle); + + public ValueTask>> SliceInlineActions(IQuerySession querySession, IEnumerable streams) => throw new NotImplementedException(); + + public async ValueTask>> SliceAsyncEvents(IQuerySession querySession, List events) + { + var aggregateId = events.First().StreamId; + var group = new TenantSliceGroup(Tenant.ForDatabase(querySession.Database)); + foreach (var @event in events) + { + await querySession.LoadAsync(@event.StreamId); + group.AddEvent(@event.StreamId, @event); + } + return [group]; + } +} diff --git a/src/Marten/Events/Aggregation/CustomProjection.cs b/src/Marten/Events/Aggregation/CustomProjection.cs index bff1881c72..d6bab18753 100644 --- a/src/Marten/Events/Aggregation/CustomProjection.cs +++ b/src/Marten/Events/Aggregation/CustomProjection.cs @@ -10,7 +10,6 @@ using Marten.Events.Daemon.Internals; using Marten.Events.Projections; using Marten.Exceptions; -using Marten.Internal; using Marten.Internal.Sessions; using Marten.Internal.Storage; using Marten.Schema; @@ -203,12 +202,12 @@ public virtual bool IsNew(EventSlice slice) /// /// /// - public ValueTask>> GroupEventRange(DocumentStore store, + public async ValueTask>> GroupEventRange(DocumentStore store, IMartenDatabase database, EventRange range, CancellationToken cancellation) { - using var session = store.LightweightSession(SessionOptions.ForDatabase(database)); - return Slicer.SliceAsyncEvents(session, range.Events); + await using var session = store.LightweightSession(SessionOptions.ForDatabase(database)); + return await Slicer.SliceAsyncEvents(session, range.Events).ConfigureAwait(false); } Type IReadOnlyProjectionData.ProjectionType => GetType();