Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions src/DaemonTests/using_patches_in_async_mode.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using System;
using System.Threading.Tasks;
using JasperFx.Core;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Marten.Patching;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace DaemonTests;

public class using_patches_in_async_mode : OneOffConfigurationsContext
{
[Fact]
public async Task do_some_patching()
{
StoreOptions(opts =>
{
opts.Projections.Add(new LetterPatcher(), ProjectionLifecycle.Async);
});

var id1 = Guid.NewGuid();
var id2 = Guid.NewGuid();
var id3 = Guid.NewGuid();

theSession.Events.StartStream<SimpleAggregate>(id1, new StartAggregate(), new AEvent(), new AEvent(), new BEvent());
theSession.Events.StartStream<SimpleAggregate>(id2, new StartAggregate(), new AEvent(), new CEvent(), new CEvent());
theSession.Events.StartStream<SimpleAggregate>(id3, new StartAggregate(), new BEvent(), new BEvent(), new BEvent(), new CEvent());

for (int i = 0; i < 100; i++)
{
theSession.Events.StartStream<SimpleAggregate>(new StartAggregate(), new AEvent(), new AEvent(), new BEvent());
}

await theSession.SaveChangesAsync();

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();
await daemon.WaitForNonStaleData(20.Seconds());

var aggregate1 = await theSession.LoadAsync<SimpleAggregate>(id1);
aggregate1.ACount.ShouldBe(2);
aggregate1.BCount.ShouldBe(1);

var aggregate2 = await theSession.LoadAsync<SimpleAggregate>(id2);
aggregate2.CCount.ShouldBe(2);


}
}

public record StartAggregate;

public class LetterPatcher: EventProjection
{
public SimpleAggregate Transform(IEvent<StartAggregate> e) => new SimpleAggregate { Id = e.StreamId };

public void Project(IEvent<AEvent> e, IDocumentOperations ops)
{
ops.Patch<SimpleAggregate>(e.StreamId).Increment(x => x.ACount);
}

public void Project(IEvent<BEvent> e, IDocumentOperations ops)
{
ops.Patch<SimpleAggregate>(e.StreamId).Increment(x => x.BCount);
}

public void Project(IEvent<CEvent> e, IDocumentOperations ops)
{
ops.Patch<SimpleAggregate>(e.StreamId).Increment(x => x.CCount);
}
}


24 changes: 22 additions & 2 deletions src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using Marten.Internal;
using Marten.Internal.Operations;
using Marten.Internal.Sessions;
using Marten.Patching;
using Marten.Services;

namespace Marten.Events.Daemon.Internals;
Expand Down Expand Up @@ -53,10 +54,15 @@ internal ProjectionUpdateBatch(DaemonSettings settings,
startNewPage(session);
}

public Task WaitForCompletion()
public async Task WaitForCompletion()
{
Queue.Complete();
return Queue.Completion;

await Queue.Completion.ConfigureAwait(false);
foreach (var patch in _patches)
{
applyOperation(patch);
}
}

// TODO -- make this private
Expand Down Expand Up @@ -250,11 +256,25 @@ private void startNewPage(IMartenSession session)
_pages.Add(_current);
}

private readonly List<IStorageOperation> _patches = new();

private void processOperation(IStorageOperation operation)
{
if (_token.IsCancellationRequested)
return;

// If there's one patch, then everything needs to be queued up for later
if (operation is PatchOperation || _patches.Any())
{
_patches.Add(operation);
return;
}

applyOperation(operation);
}

private void applyOperation(IStorageOperation operation)
{
_current.Append(operation);

_documentTypes.Fill(operation.DocumentType);
Expand Down
Loading