diff --git a/src/DaemonTests/using_patches_in_async_mode.cs b/src/DaemonTests/using_patches_in_async_mode.cs new file mode 100644 index 0000000000..65a657ba73 --- /dev/null +++ b/src/DaemonTests/using_patches_in_async_mode.cs @@ -0,0 +1,76 @@ +using System; +using System.Threading.Tasks; +using JasperFx.Core; +using Marten; +using Marten.Events; +using Marten.Events.Projections; +using Marten.Patching; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace DaemonTests; + +public class using_patches_in_async_mode : OneOffConfigurationsContext +{ + [Fact] + public async Task do_some_patching() + { + StoreOptions(opts => + { + opts.Projections.Add(new LetterPatcher(), ProjectionLifecycle.Async); + }); + + var id1 = Guid.NewGuid(); + var id2 = Guid.NewGuid(); + var id3 = Guid.NewGuid(); + + theSession.Events.StartStream(id1, new StartAggregate(), new AEvent(), new AEvent(), new BEvent()); + theSession.Events.StartStream(id2, new StartAggregate(), new AEvent(), new CEvent(), new CEvent()); + theSession.Events.StartStream(id3, new StartAggregate(), new BEvent(), new BEvent(), new BEvent(), new CEvent()); + + for (int i = 0; i < 100; i++) + { + theSession.Events.StartStream(new StartAggregate(), new AEvent(), new AEvent(), new BEvent()); + } + + await theSession.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(20.Seconds()); + + var aggregate1 = await theSession.LoadAsync(id1); + aggregate1.ACount.ShouldBe(2); + aggregate1.BCount.ShouldBe(1); + + var aggregate2 = await theSession.LoadAsync(id2); + aggregate2.CCount.ShouldBe(2); + + + } +} + +public record StartAggregate; + +public class LetterPatcher: EventProjection +{ + public SimpleAggregate Transform(IEvent e) => new SimpleAggregate { Id = e.StreamId }; + + public void Project(IEvent e, IDocumentOperations ops) + { + ops.Patch(e.StreamId).Increment(x => x.ACount); + } + + public void Project(IEvent e, IDocumentOperations ops) + { + ops.Patch(e.StreamId).Increment(x => x.BCount); + } + + public void Project(IEvent e, IDocumentOperations ops) + { + ops.Patch(e.StreamId).Increment(x => x.CCount); + } +} + + diff --git a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs index 38de552161..246542dd05 100644 --- a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs +++ b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs @@ -9,6 +9,7 @@ using Marten.Internal; using Marten.Internal.Operations; using Marten.Internal.Sessions; +using Marten.Patching; using Marten.Services; namespace Marten.Events.Daemon.Internals; @@ -53,10 +54,15 @@ internal ProjectionUpdateBatch(DaemonSettings settings, startNewPage(session); } - public Task WaitForCompletion() + public async Task WaitForCompletion() { Queue.Complete(); - return Queue.Completion; + + await Queue.Completion.ConfigureAwait(false); + foreach (var patch in _patches) + { + applyOperation(patch); + } } // TODO -- make this private @@ -250,11 +256,25 @@ private void startNewPage(IMartenSession session) _pages.Add(_current); } + private readonly List _patches = new(); + private void processOperation(IStorageOperation operation) { if (_token.IsCancellationRequested) return; + // If there's one patch, then everything needs to be queued up for later + if (operation is PatchOperation || _patches.Any()) + { + _patches.Add(operation); + return; + } + + applyOperation(operation); + } + + private void applyOperation(IStorageOperation operation) + { _current.Append(operation); _documentTypes.Fill(operation.DocumentType);