From 185254bd8441a5306617c4f7982948ade867b9fe Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 14 Apr 2026 17:50:58 -0500 Subject: [PATCH] Add ProjectLatest API to project aggregates with pending events (#4249) Adds session.Events.ProjectLatest(id) which returns the projected aggregate state including uncommitted events from the current session. For inline projections, the updated document is also stored in the session so it persists on SaveChangesAsync. This eliminates the forced SaveChangesAsync + FetchLatest round-trip pattern in command handlers that need the projected state immediately after appending events. Co-Authored-By: Claude Opus 4.6 (1M context) --- docs/.vitepress/config.mts | 1 + docs/events/projections/project-latest.md | 109 ++++++++ .../Projections/project_latest_tests.cs | 232 ++++++++++++++++++ .../Events/EventStore.FetchForWriting.cs | 18 ++ .../Fetching/FetchAsyncPlan.ForReading.cs | 19 ++ .../Fetching/FetchInlinedPlan.ForReading.cs | 31 +++ .../Fetching/FetchLivePlan.ForReading.cs | 14 ++ .../Events/Fetching/FetchNaturalKeyPlan.cs | 10 + src/Marten/Events/Fetching/FetchPlanHelper.cs | 28 +++ src/Marten/Events/IEventStoreOperations.cs | 24 ++ 10 files changed, 486 insertions(+) create mode 100644 docs/events/projections/project-latest.md create mode 100644 src/EventSourcingTests/Projections/project_latest_tests.cs create mode 100644 src/Marten/Events/Fetching/FetchPlanHelper.cs diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 9777e3d7d9..6ca794cf3a 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -225,6 +225,7 @@ const config: UserConfig = { { text: 'EF Core Projections', link: '/events/projections/efcore' }, { text: 'Projections and IoC Services', link: '/events/projections/ioc' }, { text: 'Ancillary Stores in Projections', link: '/events/projections/ancillary-stores' }, + { text: 'ProjectLatest — Include Pending Events', link: '/events/projections/project-latest' }, { text: 'Async Daemon HealthChecks', link: '/events/projections/healthchecks' },] }, { diff --git a/docs/events/projections/project-latest.md b/docs/events/projections/project-latest.md new file mode 100644 index 0000000000..ca3b215c07 --- /dev/null +++ b/docs/events/projections/project-latest.md @@ -0,0 +1,109 @@ +# ProjectLatest — Include Pending Events + +`ProjectLatest()` returns the projected state of an aggregate including any events that have been +appended in the current session but not yet committed. This eliminates the need for a forced +`SaveChangesAsync()` + `FetchLatest()` round-trip when you need the projected result immediately +after appending events. + +## Motivation + +A common pattern in command handlers looks like this: + +```csharp +// Today's pattern: forced flush + re-read +session.Events.StartStream(id, new ReportCreated("Q1")); +await session.SaveChangesAsync(ct); // forced flush +var report = await session.Events.FetchLatest(id, ct); // re-read +return report; +``` + +With `ProjectLatest`, this becomes: + +```csharp +// Better: project locally including pending events +session.Events.StartStream(id, new ReportCreated("Q1")); +var report = await session.Events.ProjectLatest(id, ct); +// SaveChangesAsync happens later (e.g., Wolverine AutoApplyTransactions) +return report; +``` + +## API + +```csharp +// On IDocumentSession.Events (IEventStoreOperations) +ValueTask ProjectLatest(Guid id, CancellationToken cancellation = default); +ValueTask ProjectLatest(string id, CancellationToken cancellation = default); +``` + +## Behavior by Projection Lifecycle + +### Live Projections + +1. Fetches all committed events from the database and builds the aggregate +2. Finds any pending (uncommitted) events for that stream in the current session +3. Applies the pending events on top of the committed state +4. Returns the result (no storage — live projections are ephemeral) + +### Inline Projections + +1. Loads the pre-projected document from the database +2. Finds any pending events for that stream in the current session +3. Applies the pending events on top using the aggregate's Apply/Create methods +4. **Stores the updated document in the session** so it will be persisted on the next `SaveChangesAsync()` +5. Returns the result + +### Async Projections + +Same behavior as inline: loads the stored document, applies pending events, stores the updated +document in the session. + +## Example + +```csharp +public record ReportCreated(string Title); +public record SectionAdded(string SectionName); +public record ReportPublished; + +public class Report +{ + public Guid Id { get; set; } + public string Title { get; set; } = ""; + public int SectionCount { get; set; } + public bool IsPublished { get; set; } + + public static Report Create(ReportCreated e) => new Report { Title = e.Title }; + public void Apply(SectionAdded e) => SectionCount++; + public void Apply(ReportPublished e) => IsPublished = true; +} + +// In a command handler: +await using var session = store.LightweightSession(); + +session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue"), + new SectionAdded("Costs") +); + +// Get the projected state WITHOUT saving first +var report = await session.Events.ProjectLatest(streamId); + +// report.Title == "Q1 Report" +// report.SectionCount == 2 +// report.IsPublished == false + +// Save happens later — the inline document is already queued for storage +await session.SaveChangesAsync(); +``` + +## When No Pending Events Exist + +If there are no uncommitted events for the given stream in the session, `ProjectLatest` behaves +identically to `FetchLatest` — it returns the current committed state. + +## Limitations + +- **Natural key projections**: `ProjectLatest` with a natural key ID falls back to `FetchLatest` + because the natural key mapping may not exist yet for uncommitted streams. +- **Read-only sessions**: `ProjectLatest` is only available on `IDocumentSession.Events` + (not `IQuerySession.Events`) because it may store the updated document for inline projections. diff --git a/src/EventSourcingTests/Projections/project_latest_tests.cs b/src/EventSourcingTests/Projections/project_latest_tests.cs new file mode 100644 index 0000000000..49213ee9ae --- /dev/null +++ b/src/EventSourcingTests/Projections/project_latest_tests.cs @@ -0,0 +1,232 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using JasperFx.Events; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Marten.Testing.Harness; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Projections; + +#region sample_project_latest_events_and_aggregate + +public record ReportCreated(string Title); +public record SectionAdded(string SectionName); +public record ReportPublished; + +public class Report +{ + public Guid Id { get; set; } + public string Title { get; set; } = ""; + public int SectionCount { get; set; } + public bool IsPublished { get; set; } + + // Self-aggregating methods for LiveStreamAggregation + public static Report Create(ReportCreated e) => new Report { Title = e.Title }; + public void Apply(SectionAdded e) => SectionCount++; + public void Apply(ReportPublished e) => IsPublished = true; +} + +public class ReportProjection : SingleStreamProjection +{ + public Report Create(ReportCreated e) => new Report { Title = e.Title }; + + public void Apply(SectionAdded e, Report report) => report.SectionCount++; + + public void Apply(ReportPublished e, Report report) => report.IsPublished = true; +} + +#endregion + +public class project_latest_tests : OneOffConfigurationsContext +{ + [Fact] + public async Task live_projection_includes_pending_events() + { + StoreOptions(opts => + { + opts.Projections.LiveStreamAggregation(); + }); + + var streamId = Guid.NewGuid(); + + await using var session = theStore.LightweightSession(); + + // Append events but do NOT save + session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue"), + new SectionAdded("Costs") + ); + + // ProjectLatest should include the pending events + var report = await session.Events.ProjectLatest(streamId); + + report.ShouldNotBeNull(); + report.Title.ShouldBe("Q1 Report"); + report.SectionCount.ShouldBe(2); + } + + [Fact] + public async Task inline_projection_includes_pending_events() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + }); + + var streamId = Guid.NewGuid(); + + await using var session = theStore.LightweightSession(); + + // Append events but do NOT save + session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue") + ); + + // ProjectLatest should include the pending events + var report = await session.Events.ProjectLatest(streamId); + + report.ShouldNotBeNull(); + report.Title.ShouldBe("Q1 Report"); + report.SectionCount.ShouldBe(1); + } + + [Fact] + public async Task inline_projection_stores_document_on_project_latest() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + }); + + var streamId = Guid.NewGuid(); + + // First, save some initial events + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue") + ); + await session.SaveChangesAsync(); + } + + // Now open a new session, append more events, and call ProjectLatest + await using (var session = theStore.LightweightSession()) + { + session.Events.Append(streamId, + new SectionAdded("Costs"), + new ReportPublished() + ); + + var report = await session.Events.ProjectLatest(streamId); + + report.ShouldNotBeNull(); + report.SectionCount.ShouldBe(2); // Revenue + Costs + report.IsPublished.ShouldBeTrue(); + + // Now save - the inline-projected document should be updated + await session.SaveChangesAsync(); + } + + // Verify the document was persisted with the projected state + await using (var query = theStore.QuerySession()) + { + var report = await query.LoadAsync(streamId); + report.ShouldNotBeNull(); + report.SectionCount.ShouldBe(2); + report.IsPublished.ShouldBeTrue(); + } + } + + [Fact] + public async Task no_pending_events_behaves_like_fetch_latest() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + }); + + var streamId = Guid.NewGuid(); + + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue") + ); + await session.SaveChangesAsync(); + } + + // No pending events in this session + await using (var session = theStore.LightweightSession()) + { + var fromProjectLatest = await session.Events.ProjectLatest(streamId); + var fromFetchLatest = await session.Events.FetchLatest(streamId); + + fromProjectLatest.ShouldNotBeNull(); + fromFetchLatest.ShouldNotBeNull(); + fromProjectLatest.Title.ShouldBe(fromFetchLatest.Title); + fromProjectLatest.SectionCount.ShouldBe(fromFetchLatest.SectionCount); + } + } + + [Fact] + public async Task live_projection_with_committed_and_pending_events() + { + StoreOptions(opts => + { + opts.Projections.LiveStreamAggregation(); + }); + + var streamId = Guid.NewGuid(); + + // First commit some events + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue") + ); + await session.SaveChangesAsync(); + } + + // Now append more without saving and project + await using (var session = theStore.LightweightSession()) + { + session.Events.Append(streamId, + new SectionAdded("Costs"), + new SectionAdded("Outlook"), + new ReportPublished() + ); + + var report = await session.Events.ProjectLatest(streamId); + + report.ShouldNotBeNull(); + report.Title.ShouldBe("Q1 Report"); + report.SectionCount.ShouldBe(3); // Revenue + Costs + Outlook + report.IsPublished.ShouldBeTrue(); + } + } + + [Fact] + public async Task returns_null_for_nonexistent_stream_with_no_pending_events() + { + StoreOptions(opts => + { + opts.Projections.LiveStreamAggregation(); + }); + + await using var session = theStore.LightweightSession(); + + var report = await session.Events.ProjectLatest(Guid.NewGuid()); + report.ShouldBeNull(); + } +} diff --git a/src/Marten/Events/EventStore.FetchForWriting.cs b/src/Marten/Events/EventStore.FetchForWriting.cs index 2a3db152b7..ebf68b9324 100644 --- a/src/Marten/Events/EventStore.FetchForWriting.cs +++ b/src/Marten/Events/EventStore.FetchForWriting.cs @@ -207,6 +207,18 @@ public Task> FetchForExclusiveWriting(string key, return plan.FetchForReading(_session, id, cancellation); } + public ValueTask ProjectLatest(Guid id, CancellationToken cancellation = default) where T : class + { + var plan = FindFetchPlan(); + return plan.ProjectLatest(_session, id, cancellation); + } + + public ValueTask ProjectLatest(string id, CancellationToken cancellation = default) where T : class + { + var plan = FindFetchPlan(); + return plan.ProjectLatest(_session, id, cancellation); + } + public Task StreamLatestJson(Guid id, Stream destination, CancellationToken cancellation = default) where T : class { var plan = FindFetchPlan(); @@ -332,6 +344,12 @@ Task> FetchForWriting(DocumentSessionBase session, TId id, lo ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation); + /// + /// Fetch the projected aggregate including any uncommitted events in the session. + /// For inline projections, the updated document is also stored in the session. + /// + ValueTask ProjectLatest(DocumentSessionBase session, TId id, CancellationToken cancellation); + Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation); // These two methods are for batching diff --git a/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs b/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs index 58a212d9f0..b86f9da479 100644 --- a/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs +++ b/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs @@ -59,6 +59,25 @@ internal partial class FetchAsyncPlan return await readLatest(session, id, cancellation, loadHandler, reader, selector).ConfigureAwait(false); } + public async ValueTask ProjectLatest(DocumentSessionBase session, TId id, CancellationToken cancellation) + { + var snapshot = await FetchForReading(session, id, cancellation).ConfigureAwait(false); + + var pendingEvents = FetchPlanHelper.FindPendingEvents(session, id); + if (pendingEvents is not { Count: > 0 }) return snapshot; + + snapshot = await _aggregator.BuildAsync(pendingEvents, session, snapshot, id, _storage, cancellation) + .ConfigureAwait(false); + + // Store the updated document so it persists when the session commits + if (snapshot != null) + { + session.Store(snapshot); + } + + return snapshot; + } + public async Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation) { await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); diff --git a/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs b/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs index 52ab9d553c..02cde3cbc4 100644 --- a/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs +++ b/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs @@ -1,6 +1,9 @@ +using System; using System.IO; using System.Threading; using System.Threading.Tasks; +using JasperFx.Core.Reflection; +using JasperFx.Events.Aggregation; using Marten.Internal.Sessions; using Marten.Internal.Storage; using Marten.Linq.QueryHandlers; @@ -35,6 +38,34 @@ internal partial class FetchInlinedPlan return document; } + public async ValueTask ProjectLatest(DocumentSessionBase session, TId id, CancellationToken cancellation) + { + var snapshot = await FetchForReading(session, id, cancellation).ConfigureAwait(false); + + var pendingEvents = FetchPlanHelper.FindPendingEvents(session, id); + if (pendingEvents is not { Count: > 0 }) return snapshot; + + // Build the aggregator on demand + var raw = session.Options.Projections.AggregatorFor(); + var storage = findDocumentStorage(session); + var aggregator = raw as IAggregator + ?? typeof(IdentityForwardingAggregator<,,,>) + .CloseAndBuildAs>(raw, storage, typeof(TDoc), + storage is IDocumentStorage s ? s.IdType : typeof(TId), + typeof(TId), typeof(IQuerySession)); + + snapshot = await aggregator.BuildAsync(pendingEvents, session, snapshot, id, storage, cancellation) + .ConfigureAwait(false); + + // Store the updated document so it persists when the session commits + if (snapshot != null) + { + session.Store(snapshot); + } + + return snapshot; + } + public async Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation) { var storage = findDocumentStorage(session); diff --git a/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs b/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs index 0c8faac68a..6546b34194 100644 --- a/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs +++ b/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs @@ -45,6 +45,20 @@ internal partial class FetchLivePlan return await _aggregator.BuildAsync(events, session, default, id, _documentStorage, cancellation).ConfigureAwait(false); } + public async ValueTask ProjectLatest(DocumentSessionBase session, TId id, CancellationToken cancellation) + { + var snapshot = await FetchForReading(session, id, cancellation).ConfigureAwait(false); + + var pendingEvents = FetchPlanHelper.FindPendingEvents(session, id); + if (pendingEvents is { Count: > 0 }) + { + snapshot = await _aggregator.BuildAsync(pendingEvents, session, snapshot, id, _documentStorage, cancellation) + .ConfigureAwait(false); + } + + return snapshot; + } + public async Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation) { var aggregate = await FetchForReading(session, id, cancellation).ConfigureAwait(false); diff --git a/src/Marten/Events/Fetching/FetchNaturalKeyPlan.cs b/src/Marten/Events/Fetching/FetchNaturalKeyPlan.cs index f760acfe69..fda9e8c13e 100644 --- a/src/Marten/Events/Fetching/FetchNaturalKeyPlan.cs +++ b/src/Marten/Events/Fetching/FetchNaturalKeyPlan.cs @@ -207,6 +207,16 @@ await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait } } + public async ValueTask ProjectLatest(DocumentSessionBase session, TNaturalKey id, + CancellationToken cancellation) + { + // For natural keys, we cannot reliably find pending events because they are + // tracked by stream ID (Guid/string), not by natural key. The natural key mapping + // is typically created by the inline projection itself, which hasn't run yet for + // uncommitted events. Fall back to FetchForReading. + return await FetchForReading(session, id, cancellation).ConfigureAwait(false); + } + public async Task StreamForReading(DocumentSessionBase session, TNaturalKey id, Stream destination, CancellationToken cancellation) { diff --git a/src/Marten/Events/Fetching/FetchPlanHelper.cs b/src/Marten/Events/Fetching/FetchPlanHelper.cs new file mode 100644 index 0000000000..0e6040c32e --- /dev/null +++ b/src/Marten/Events/Fetching/FetchPlanHelper.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using JasperFx.Events; +using Marten.Internal.Sessions; + +namespace Marten.Events.Fetching; + +internal static class FetchPlanHelper +{ + /// + /// Find any events that have been appended to the given stream in the current + /// session but not yet committed via SaveChangesAsync(). + /// + public static IReadOnlyList? FindPendingEvents(DocumentSessionBase session, TId id) + { + if (id is Guid guidId && session.WorkTracker.TryFindStream(guidId, out var guidStream)) + { + return guidStream.Events; + } + + if (id is string stringId && session.WorkTracker.TryFindStream(stringId, out var stringStream)) + { + return stringStream.Events; + } + + return null; + } +} diff --git a/src/Marten/Events/IEventStoreOperations.cs b/src/Marten/Events/IEventStoreOperations.cs index eb564feec0..879e943c82 100644 --- a/src/Marten/Events/IEventStoreOperations.cs +++ b/src/Marten/Events/IEventStoreOperations.cs @@ -382,6 +382,30 @@ Task WriteExclusivelyToAggregate(string id, Func, Task> writi /// ValueTask FetchLatest(string id, CancellationToken cancellation = default) where T : class; + /// + /// Fetch the projected aggregate T by id, including any events appended + /// in this session that have not yet been committed. For inline projections, + /// the updated document is also stored in the session so it will be persisted + /// on the next SaveChangesAsync() call. + /// + /// + /// + /// + /// + ValueTask ProjectLatest(Guid id, CancellationToken cancellation = default) where T : class; + + /// + /// Fetch the projected aggregate T by id, including any events appended + /// in this session that have not yet been committed. For inline projections, + /// the updated document is also stored in the session so it will be persisted + /// on the next SaveChangesAsync() call. + /// + /// + /// + /// + /// + ValueTask ProjectLatest(string id, CancellationToken cancellation = default) where T : class; + /// /// Stream the raw JSON of the projected aggregate T by id directly to a destination stream. /// This avoids any deserialization/serialization round-trip when the aggregate is stored inline or