diff --git a/src/DaemonTests/Composites/end_to_end_with_composite_projection.cs b/src/DaemonTests/Composites/end_to_end_with_composite_projection.cs index 6c1ee2fddf..123b604b90 100644 --- a/src/DaemonTests/Composites/end_to_end_with_composite_projection.cs +++ b/src/DaemonTests/Composites/end_to_end_with_composite_projection.cs @@ -1,18 +1,58 @@ +using System; +using System.Collections.Generic; using System.Linq; +using System.Threading; using System.Threading.Tasks; using DaemonTests.Aggregations; -using DaemonTests.TeleHealth; using DaemonTests.TestingSupport; using JasperFx.Core; +using JasperFx.Events; using Marten; using Marten.Events; +using Marten.Events.Projections; using Shouldly; using Xunit; using Xunit.Abstractions; -using Appointment = EventSourcingTests.Examples.TeleHealth.Appointment; namespace DaemonTests.Composites; +public class TripMetrics +{ + public Guid Id { get; set; } + public int TotalStarted { get; set; } + public int TotalEnded { get; set; } +} + +public class TripMetricsProjection: IProjection +{ + public Task ApplyAsync(IDocumentOperations operations, IReadOnlyList events, + CancellationToken cancellation) + { + foreach (var e in events) + { + switch (e.Data) + { + case TripStarted: + operations.Store(new TripMetrics + { + Id = e.StreamId, + TotalStarted = 1 + }); + break; + case TripEnded: + operations.Store(new TripMetrics + { + Id = e.StreamId, + TotalEnded = 1 + }); + break; + } + } + + return Task.CompletedTask; + } +} + public class end_to_end_with_composite_projection : DaemonContext { public end_to_end_with_composite_projection(ITestOutputHelper output) : base(output) @@ -32,6 +72,7 @@ public async Task run_with_multiple_projections() { x.Add(); x.Add(); + x.Add(new TripMetricsProjection()); }); }, true); @@ -65,6 +106,12 @@ public async Task run_with_multiple_projections() days.Any(x => x.Id == day).ShouldBeTrue(); } + // Verify the custom IProjection was executed within the composite + var tripMetrics = await theSession.Query().ToListAsync(); + tripMetrics.Count.ShouldBeGreaterThan(0); + tripMetrics.Any(m => m.TotalStarted > 0).ShouldBeTrue(); + + // Persist all of the progressions of the constituent parts var progressions = await theStore.Advanced.AllProjectionProgress(); progressions.Single(x => x.ShardName == "Trips:All").Sequence.ShouldBe(NumberOfEvents); diff --git a/src/DaemonTests/Composites/multi_stage_projections.cs b/src/DaemonTests/Composites/multi_stage_projections.cs index da26192087..6b520e35e1 100644 --- a/src/DaemonTests/Composites/multi_stage_projections.cs +++ b/src/DaemonTests/Composites/multi_stage_projections.cs @@ -13,6 +13,8 @@ using JasperFx.MultiTenancy; using Marten; using Marten.Events; +using Marten.Events.Projections; +using Marten.Schema; using Marten.Storage; using Shouldly; using Xunit; @@ -203,6 +205,7 @@ public async Task end_to_end(TenancyStyle tenancyStyle) projection.Add(); projection.Add(); projection.Snapshot(); + projection.Add(new AppointmentMetricsProjection()); // 2nd stage projections projection.Add(2); @@ -241,6 +244,11 @@ public async Task end_to_end(TenancyStyle tenancyStyle) // Got appointments (await _compositeSession.Query().CountAsync()).ShouldBeGreaterThan(0); + // Verify the custom IProjection counted appointment requests by specialty + var appointmentMetrics = await _compositeSession.Query().ToListAsync(); + appointmentMetrics.Count.ShouldBeGreaterThan(0); + appointmentMetrics.ShouldAllBe(m => m.Count > 0); + // Got details from the 2nd stage projection! (await _compositeSession.Query().CountAsync()).ShouldBeGreaterThan(0); (await _compositeSession.Query().CountAsync()).ShouldBeGreaterThan(0); @@ -326,3 +334,30 @@ private async Task assignProvidersToAppointments() await _compositeSession.SaveChangesAsync(); } } + +public class AppointmentMetrics +{ + [Identity] + public string SpecialtyCode { get; set; } + public int Count { get; set; } +} + +public class AppointmentMetricsProjection: IProjection +{ + public async Task ApplyAsync(IDocumentOperations operations, IReadOnlyList events, + CancellationToken cancellation) + { + var groups = events + .Where(e => e.Data is AppointmentRequested) + .Select(e => (AppointmentRequested)e.Data) + .GroupBy(r => r.SpecialtyCode); + + foreach (var group in groups) + { + var metrics = await operations.LoadAsync(group.Key, cancellation) + ?? new AppointmentMetrics { SpecialtyCode = group.Key }; + metrics.Count += group.Count(); + operations.Store(metrics); + } + } +} diff --git a/src/Marten/Events/Projections/CompositeProjection.cs b/src/Marten/Events/Projections/CompositeProjection.cs index db44144dbe..a6326a2355 100644 --- a/src/Marten/Events/Projections/CompositeProjection.cs +++ b/src/Marten/Events/Projections/CompositeProjection.cs @@ -1,9 +1,20 @@ using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; using JasperFx.Core.Reflection; +using JasperFx.Events; +using JasperFx.Events.Aggregation; +using JasperFx.Events.Daemon; +using JasperFx.Events.Descriptors; using JasperFx.Events.Projections; using JasperFx.Events.Projections.Composite; +using JasperFx.Events.Subscriptions; using Marten.Events.Aggregation; using Marten.Schema; +using Microsoft.Extensions.Logging; namespace Marten.Events.Projections; @@ -57,6 +68,18 @@ public void Add(IProjectionSource projection StageFor(stageNumber).Add(projection); } + /// + /// Add a custom IProjection implementation to be executed within this composite. + /// The projection will be wrapped internally for composite-safe execution. + /// + /// The custom IProjection implementation + /// Optionally move the execution to a later stage. The default is 1 + public void Add(IProjection projection, int stageNumber = 1) + { + var wrapper = new CompositeIProjectionSource(projection); + StageFor(stageNumber).Add(wrapper); + } + /// /// Add a projection to be executed within this composite. The stage number is optional /// @@ -67,3 +90,118 @@ public void Add(IProjectionSource projection Add(new T(), stageNumber); } } + +/// +/// Wraps an IProjection for use inside a CompositeProjection, providing composite-safe +/// batch lifecycle management. Unlike ProjectionWrapper which uses ProjectionExecution +/// (and unconditionally disposes shared batches), this wrapper only disposes batches it owns. +/// +internal class CompositeIProjectionSource : + ProjectionBase, + IProjectionSource, + ISubscriptionFactory +{ + private readonly IProjection _projection; + + public CompositeIProjectionSource(IProjection projection) + { + _projection = projection; + Lifecycle = ProjectionLifecycle.Async; + Name = projection.GetType().Name; + Version = 1; + if (_projection.GetType().TryGetAttribute(out var att)) + { + Version = att.Version; + } + } + + public SubscriptionType Type => SubscriptionType.EventProjection; + public ShardName[] ShardNames() => [new ShardName(Name, ShardName.All, Version)]; + public Type ImplementationType => _projection.GetType(); + public SubscriptionDescriptor Describe(IEventStore store) => new(this, store); + + public IReadOnlyList> Shards() + { + return + [ + new AsyncShard(Options, ShardRole.Projection, + new ShardName(Name, "All", Version), this, this) + ]; + } + + public bool TryBuildReplayExecutor(IEventStore store, IEventDatabase database, + [NotNullWhen(true)] out IReplayExecutor? executor) + { + executor = default; + return false; + } + + public IInlineProjection BuildForInline() + { + throw new NotSupportedException("CompositeIProjectionSource does not support inline execution"); + } + + public ISubscriptionExecution BuildExecution(IEventStore store, + IEventDatabase database, ILoggerFactory loggerFactory, ShardName shardName) + { + return new CompositeIProjectionExecution(_projection, shardName); + } + + public ISubscriptionExecution BuildExecution(IEventStore store, + IEventDatabase database, ILogger logger, ShardName shardName) + { + return new CompositeIProjectionExecution(_projection, shardName); + } +} + +/// +/// A lightweight ISubscriptionExecution for IProjection instances running inside a composite. +/// Does NOT dispose the shared batch — the composite manages batch lifecycle. +/// +internal class CompositeIProjectionExecution : ISubscriptionExecution +{ + private readonly IProjection _projection; + + public CompositeIProjectionExecution(IProjection projection, ShardName shardName) + { + _projection = projection; + ShardName = shardName; + } + + public ShardName ShardName { get; } + public ShardExecutionMode Mode { get; set; } + + public async Task ProcessRangeAsync(EventRange range) + { + var batch = range.ActiveBatch as IProjectionBatch; + if (batch == null) return; + + var groups = range.Events.GroupBy(x => x.TenantId).ToArray(); + foreach (var group in groups) + { + await using var session = batch.SessionForTenant(group.Key); + await _projection.ApplyAsync(session, group.ToList(), CancellationToken.None).ConfigureAwait(false); + } + } + + public ValueTask EnqueueAsync(EventPage page, ISubscriptionAgent subscriptionAgent) => new(); + public Task StopAndDrainAsync(CancellationToken token) => Task.CompletedTask; + public Task HardStopAsync() => Task.CompletedTask; + + public bool TryBuildReplayExecutor([NotNullWhen(true)] out IReplayExecutor? executor) + { + executor = default; + return false; + } + + public Task ProcessImmediatelyAsync(SubscriptionAgent subscriptionAgent, EventPage events, + CancellationToken cancellation) => Task.CompletedTask; + + public bool TryGetAggregateCache([NotNullWhen(true)] out IAggregateCaching? caching) + { + caching = null; + return false; + } + + public ValueTask DisposeAsync() => new(); +}