Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 49 additions & 2 deletions src/DaemonTests/Composites/end_to_end_with_composite_projection.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,58 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DaemonTests.Aggregations;
using DaemonTests.TeleHealth;
using DaemonTests.TestingSupport;
using JasperFx.Core;
using JasperFx.Events;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Shouldly;
using Xunit;
using Xunit.Abstractions;
using Appointment = EventSourcingTests.Examples.TeleHealth.Appointment;

namespace DaemonTests.Composites;

public class TripMetrics
{
public Guid Id { get; set; }
public int TotalStarted { get; set; }
public int TotalEnded { get; set; }
}

public class TripMetricsProjection: IProjection
{
public Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<IEvent> events,
CancellationToken cancellation)
{
foreach (var e in events)
{
switch (e.Data)
{
case TripStarted:
operations.Store(new TripMetrics
{
Id = e.StreamId,
TotalStarted = 1
});
break;
case TripEnded:
operations.Store(new TripMetrics
{
Id = e.StreamId,
TotalEnded = 1
});
break;
}
}

return Task.CompletedTask;
}
}

public class end_to_end_with_composite_projection : DaemonContext
{
public end_to_end_with_composite_projection(ITestOutputHelper output) : base(output)
Expand All @@ -32,6 +72,7 @@ public async Task run_with_multiple_projections()
{
x.Add<TestingSupport.TripProjection>();
x.Add<DayProjection>();
x.Add(new TripMetricsProjection());
});
}, true);

Expand Down Expand Up @@ -65,6 +106,12 @@ public async Task run_with_multiple_projections()
days.Any(x => x.Id == day).ShouldBeTrue();
}

// Verify the custom IProjection was executed within the composite
var tripMetrics = await theSession.Query<TripMetrics>().ToListAsync();
tripMetrics.Count.ShouldBeGreaterThan(0);
tripMetrics.Any(m => m.TotalStarted > 0).ShouldBeTrue();


// Persist all of the progressions of the constituent parts
var progressions = await theStore.Advanced.AllProjectionProgress();
progressions.Single(x => x.ShardName == "Trips:All").Sequence.ShouldBe(NumberOfEvents);
Expand Down
35 changes: 35 additions & 0 deletions src/DaemonTests/Composites/multi_stage_projections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
using JasperFx.MultiTenancy;
using Marten;
using Marten.Events;
using Marten.Events.Projections;
using Marten.Schema;
using Marten.Storage;
using Shouldly;
using Xunit;
Expand Down Expand Up @@ -203,6 +205,7 @@ public async Task end_to_end(TenancyStyle tenancyStyle)
projection.Add<ProviderShiftProjection>();
projection.Add<AppointmentProjection>();
projection.Snapshot<Board>();
projection.Add(new AppointmentMetricsProjection());

// 2nd stage projections
projection.Add<AppointmentDetailsProjection>(2);
Expand Down Expand Up @@ -241,6 +244,11 @@ public async Task end_to_end(TenancyStyle tenancyStyle)
// Got appointments
(await _compositeSession.Query<Appointment>().CountAsync()).ShouldBeGreaterThan(0);

// Verify the custom IProjection counted appointment requests by specialty
var appointmentMetrics = await _compositeSession.Query<AppointmentMetrics>().ToListAsync();
appointmentMetrics.Count.ShouldBeGreaterThan(0);
appointmentMetrics.ShouldAllBe(m => m.Count > 0);

// Got details from the 2nd stage projection!
(await _compositeSession.Query<AppointmentDetails>().CountAsync()).ShouldBeGreaterThan(0);
(await _compositeSession.Query<AppointmentByExternalIdentifier>().CountAsync()).ShouldBeGreaterThan(0);
Expand Down Expand Up @@ -326,3 +334,30 @@ private async Task assignProvidersToAppointments()
await _compositeSession.SaveChangesAsync();
}
}

public class AppointmentMetrics
{
[Identity]
public string SpecialtyCode { get; set; }
public int Count { get; set; }
}

public class AppointmentMetricsProjection: IProjection
{
public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList<IEvent> events,
CancellationToken cancellation)
{
var groups = events
.Where(e => e.Data is AppointmentRequested)
.Select(e => (AppointmentRequested)e.Data)
.GroupBy(r => r.SpecialtyCode);

foreach (var group in groups)
{
var metrics = await operations.LoadAsync<AppointmentMetrics>(group.Key, cancellation)
?? new AppointmentMetrics { SpecialtyCode = group.Key };
metrics.Count += group.Count();
operations.Store(metrics);
}
}
}
138 changes: 138 additions & 0 deletions src/Marten/Events/Projections/CompositeProjection.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core.Reflection;
using JasperFx.Events;
using JasperFx.Events.Aggregation;
using JasperFx.Events.Daemon;
using JasperFx.Events.Descriptors;
using JasperFx.Events.Projections;
using JasperFx.Events.Projections.Composite;
using JasperFx.Events.Subscriptions;
using Marten.Events.Aggregation;
using Marten.Schema;
using Microsoft.Extensions.Logging;

namespace Marten.Events.Projections;

Expand Down Expand Up @@ -57,6 +68,18 @@ public void Add(IProjectionSource<IDocumentOperations, IQuerySession> projection
StageFor(stageNumber).Add(projection);
}

/// <summary>
/// Add a custom IProjection implementation to be executed within this composite.
/// The projection will be wrapped internally for composite-safe execution.
/// </summary>
/// <param name="projection">The custom IProjection implementation</param>
/// <param name="stageNumber">Optionally move the execution to a later stage. The default is 1</param>
public void Add(IProjection projection, int stageNumber = 1)
{
var wrapper = new CompositeIProjectionSource(projection);
StageFor(stageNumber).Add(wrapper);
}

/// <summary>
/// Add a projection to be executed within this composite. The stage number is optional
/// </summary>
Expand All @@ -67,3 +90,118 @@ public void Add(IProjectionSource<IDocumentOperations, IQuerySession> projection
Add(new T(), stageNumber);
}
}

/// <summary>
/// Wraps an IProjection for use inside a CompositeProjection, providing composite-safe
/// batch lifecycle management. Unlike ProjectionWrapper which uses ProjectionExecution
/// (and unconditionally disposes shared batches), this wrapper only disposes batches it owns.
/// </summary>
internal class CompositeIProjectionSource :
ProjectionBase,
IProjectionSource<IDocumentOperations, IQuerySession>,
ISubscriptionFactory<IDocumentOperations, IQuerySession>
{
private readonly IProjection _projection;

public CompositeIProjectionSource(IProjection projection)
{
_projection = projection;
Lifecycle = ProjectionLifecycle.Async;
Name = projection.GetType().Name;
Version = 1;
if (_projection.GetType().TryGetAttribute<ProjectionVersionAttribute>(out var att))
{
Version = att.Version;
}
}

public SubscriptionType Type => SubscriptionType.EventProjection;
public ShardName[] ShardNames() => [new ShardName(Name, ShardName.All, Version)];
public Type ImplementationType => _projection.GetType();
public SubscriptionDescriptor Describe(IEventStore store) => new(this, store);

public IReadOnlyList<AsyncShard<IDocumentOperations, IQuerySession>> Shards()
{
return
[
new AsyncShard<IDocumentOperations, IQuerySession>(Options, ShardRole.Projection,
new ShardName(Name, "All", Version), this, this)
];
}

public bool TryBuildReplayExecutor(IEventStore<IDocumentOperations, IQuerySession> store, IEventDatabase database,
[NotNullWhen(true)] out IReplayExecutor? executor)
{
executor = default;
return false;
}

public IInlineProjection<IDocumentOperations> BuildForInline()
{
throw new NotSupportedException("CompositeIProjectionSource does not support inline execution");
}

public ISubscriptionExecution BuildExecution(IEventStore<IDocumentOperations, IQuerySession> store,
IEventDatabase database, ILoggerFactory loggerFactory, ShardName shardName)
{
return new CompositeIProjectionExecution(_projection, shardName);
}

public ISubscriptionExecution BuildExecution(IEventStore<IDocumentOperations, IQuerySession> store,
IEventDatabase database, ILogger logger, ShardName shardName)
{
return new CompositeIProjectionExecution(_projection, shardName);
}
}

/// <summary>
/// A lightweight ISubscriptionExecution for IProjection instances running inside a composite.
/// Does NOT dispose the shared batch — the composite manages batch lifecycle.
/// </summary>
internal class CompositeIProjectionExecution : ISubscriptionExecution
{
private readonly IProjection _projection;

public CompositeIProjectionExecution(IProjection projection, ShardName shardName)
{
_projection = projection;
ShardName = shardName;
}

public ShardName ShardName { get; }
public ShardExecutionMode Mode { get; set; }

public async Task ProcessRangeAsync(EventRange range)
{
var batch = range.ActiveBatch as IProjectionBatch<IDocumentOperations, IQuerySession>;
if (batch == null) return;

var groups = range.Events.GroupBy(x => x.TenantId).ToArray();
foreach (var group in groups)
{
await using var session = batch.SessionForTenant(group.Key);
await _projection.ApplyAsync(session, group.ToList(), CancellationToken.None).ConfigureAwait(false);
}
}

public ValueTask EnqueueAsync(EventPage page, ISubscriptionAgent subscriptionAgent) => new();
public Task StopAndDrainAsync(CancellationToken token) => Task.CompletedTask;
public Task HardStopAsync() => Task.CompletedTask;

public bool TryBuildReplayExecutor([NotNullWhen(true)] out IReplayExecutor? executor)
{
executor = default;
return false;
}

public Task ProcessImmediatelyAsync(SubscriptionAgent subscriptionAgent, EventPage events,
CancellationToken cancellation) => Task.CompletedTask;

public bool TryGetAggregateCache<TId, TDoc>([NotNullWhen(true)] out IAggregateCaching<TId, TDoc>? caching)
{
caching = null;
return false;
}

public ValueTask DisposeAsync() => new();
}
Loading