diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 9777e3d7d9..6ca794cf3a 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -225,6 +225,7 @@ const config: UserConfig = { { text: 'EF Core Projections', link: '/events/projections/efcore' }, { text: 'Projections and IoC Services', link: '/events/projections/ioc' }, { text: 'Ancillary Stores in Projections', link: '/events/projections/ancillary-stores' }, + { text: 'ProjectLatest — Include Pending Events', link: '/events/projections/project-latest' }, { text: 'Async Daemon HealthChecks', link: '/events/projections/healthchecks' },] }, { diff --git a/docs/events/projections/project-latest.md b/docs/events/projections/project-latest.md new file mode 100644 index 0000000000..ca3b215c07 --- /dev/null +++ b/docs/events/projections/project-latest.md @@ -0,0 +1,109 @@ +# ProjectLatest — Include Pending Events + +`ProjectLatest()` returns the projected state of an aggregate including any events that have been +appended in the current session but not yet committed. This eliminates the need for a forced +`SaveChangesAsync()` + `FetchLatest()` round-trip when you need the projected result immediately +after appending events. + +## Motivation + +A common pattern in command handlers looks like this: + +```csharp +// Today's pattern: forced flush + re-read +session.Events.StartStream(id, new ReportCreated("Q1")); +await session.SaveChangesAsync(ct); // forced flush +var report = await session.Events.FetchLatest(id, ct); // re-read +return report; +``` + +With `ProjectLatest`, this becomes: + +```csharp +// Better: project locally including pending events +session.Events.StartStream(id, new ReportCreated("Q1")); +var report = await session.Events.ProjectLatest(id, ct); +// SaveChangesAsync happens later (e.g., Wolverine AutoApplyTransactions) +return report; +``` + +## API + +```csharp +// On IDocumentSession.Events (IEventStoreOperations) +ValueTask ProjectLatest(Guid id, CancellationToken cancellation = default); +ValueTask ProjectLatest(string id, CancellationToken cancellation = default); +``` + +## Behavior by Projection Lifecycle + +### Live Projections + +1. Fetches all committed events from the database and builds the aggregate +2. Finds any pending (uncommitted) events for that stream in the current session +3. Applies the pending events on top of the committed state +4. Returns the result (no storage — live projections are ephemeral) + +### Inline Projections + +1. Loads the pre-projected document from the database +2. Finds any pending events for that stream in the current session +3. Applies the pending events on top using the aggregate's Apply/Create methods +4. **Stores the updated document in the session** so it will be persisted on the next `SaveChangesAsync()` +5. Returns the result + +### Async Projections + +Same behavior as inline: loads the stored document, applies pending events, stores the updated +document in the session. + +## Example + +```csharp +public record ReportCreated(string Title); +public record SectionAdded(string SectionName); +public record ReportPublished; + +public class Report +{ + public Guid Id { get; set; } + public string Title { get; set; } = ""; + public int SectionCount { get; set; } + public bool IsPublished { get; set; } + + public static Report Create(ReportCreated e) => new Report { Title = e.Title }; + public void Apply(SectionAdded e) => SectionCount++; + public void Apply(ReportPublished e) => IsPublished = true; +} + +// In a command handler: +await using var session = store.LightweightSession(); + +session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue"), + new SectionAdded("Costs") +); + +// Get the projected state WITHOUT saving first +var report = await session.Events.ProjectLatest(streamId); + +// report.Title == "Q1 Report" +// report.SectionCount == 2 +// report.IsPublished == false + +// Save happens later — the inline document is already queued for storage +await session.SaveChangesAsync(); +``` + +## When No Pending Events Exist + +If there are no uncommitted events for the given stream in the session, `ProjectLatest` behaves +identically to `FetchLatest` — it returns the current committed state. + +## Limitations + +- **Natural key projections**: `ProjectLatest` with a natural key ID falls back to `FetchLatest` + because the natural key mapping may not exist yet for uncommitted streams. +- **Read-only sessions**: `ProjectLatest` is only available on `IDocumentSession.Events` + (not `IQuerySession.Events`) because it may store the updated document for inline projections. diff --git a/src/EventSourcingTests/Projections/project_latest_tests.cs b/src/EventSourcingTests/Projections/project_latest_tests.cs new file mode 100644 index 0000000000..49213ee9ae --- /dev/null +++ b/src/EventSourcingTests/Projections/project_latest_tests.cs @@ -0,0 +1,232 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using JasperFx.Events; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Marten.Testing.Harness; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Projections; + +#region sample_project_latest_events_and_aggregate + +public record ReportCreated(string Title); +public record SectionAdded(string SectionName); +public record ReportPublished; + +public class Report +{ + public Guid Id { get; set; } + public string Title { get; set; } = ""; + public int SectionCount { get; set; } + public bool IsPublished { get; set; } + + // Self-aggregating methods for LiveStreamAggregation + public static Report Create(ReportCreated e) => new Report { Title = e.Title }; + public void Apply(SectionAdded e) => SectionCount++; + public void Apply(ReportPublished e) => IsPublished = true; +} + +public class ReportProjection : SingleStreamProjection +{ + public Report Create(ReportCreated e) => new Report { Title = e.Title }; + + public void Apply(SectionAdded e, Report report) => report.SectionCount++; + + public void Apply(ReportPublished e, Report report) => report.IsPublished = true; +} + +#endregion + +public class project_latest_tests : OneOffConfigurationsContext +{ + [Fact] + public async Task live_projection_includes_pending_events() + { + StoreOptions(opts => + { + opts.Projections.LiveStreamAggregation(); + }); + + var streamId = Guid.NewGuid(); + + await using var session = theStore.LightweightSession(); + + // Append events but do NOT save + session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue"), + new SectionAdded("Costs") + ); + + // ProjectLatest should include the pending events + var report = await session.Events.ProjectLatest(streamId); + + report.ShouldNotBeNull(); + report.Title.ShouldBe("Q1 Report"); + report.SectionCount.ShouldBe(2); + } + + [Fact] + public async Task inline_projection_includes_pending_events() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + }); + + var streamId = Guid.NewGuid(); + + await using var session = theStore.LightweightSession(); + + // Append events but do NOT save + session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue") + ); + + // ProjectLatest should include the pending events + var report = await session.Events.ProjectLatest(streamId); + + report.ShouldNotBeNull(); + report.Title.ShouldBe("Q1 Report"); + report.SectionCount.ShouldBe(1); + } + + [Fact] + public async Task inline_projection_stores_document_on_project_latest() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + }); + + var streamId = Guid.NewGuid(); + + // First, save some initial events + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue") + ); + await session.SaveChangesAsync(); + } + + // Now open a new session, append more events, and call ProjectLatest + await using (var session = theStore.LightweightSession()) + { + session.Events.Append(streamId, + new SectionAdded("Costs"), + new ReportPublished() + ); + + var report = await session.Events.ProjectLatest(streamId); + + report.ShouldNotBeNull(); + report.SectionCount.ShouldBe(2); // Revenue + Costs + report.IsPublished.ShouldBeTrue(); + + // Now save - the inline-projected document should be updated + await session.SaveChangesAsync(); + } + + // Verify the document was persisted with the projected state + await using (var query = theStore.QuerySession()) + { + var report = await query.LoadAsync(streamId); + report.ShouldNotBeNull(); + report.SectionCount.ShouldBe(2); + report.IsPublished.ShouldBeTrue(); + } + } + + [Fact] + public async Task no_pending_events_behaves_like_fetch_latest() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Inline); + }); + + var streamId = Guid.NewGuid(); + + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue") + ); + await session.SaveChangesAsync(); + } + + // No pending events in this session + await using (var session = theStore.LightweightSession()) + { + var fromProjectLatest = await session.Events.ProjectLatest(streamId); + var fromFetchLatest = await session.Events.FetchLatest(streamId); + + fromProjectLatest.ShouldNotBeNull(); + fromFetchLatest.ShouldNotBeNull(); + fromProjectLatest.Title.ShouldBe(fromFetchLatest.Title); + fromProjectLatest.SectionCount.ShouldBe(fromFetchLatest.SectionCount); + } + } + + [Fact] + public async Task live_projection_with_committed_and_pending_events() + { + StoreOptions(opts => + { + opts.Projections.LiveStreamAggregation(); + }); + + var streamId = Guid.NewGuid(); + + // First commit some events + await using (var session = theStore.LightweightSession()) + { + session.Events.StartStream(streamId, + new ReportCreated("Q1 Report"), + new SectionAdded("Revenue") + ); + await session.SaveChangesAsync(); + } + + // Now append more without saving and project + await using (var session = theStore.LightweightSession()) + { + session.Events.Append(streamId, + new SectionAdded("Costs"), + new SectionAdded("Outlook"), + new ReportPublished() + ); + + var report = await session.Events.ProjectLatest(streamId); + + report.ShouldNotBeNull(); + report.Title.ShouldBe("Q1 Report"); + report.SectionCount.ShouldBe(3); // Revenue + Costs + Outlook + report.IsPublished.ShouldBeTrue(); + } + } + + [Fact] + public async Task returns_null_for_nonexistent_stream_with_no_pending_events() + { + StoreOptions(opts => + { + opts.Projections.LiveStreamAggregation(); + }); + + await using var session = theStore.LightweightSession(); + + var report = await session.Events.ProjectLatest(Guid.NewGuid()); + report.ShouldBeNull(); + } +} diff --git a/src/Marten/Events/EventStore.FetchForWriting.cs b/src/Marten/Events/EventStore.FetchForWriting.cs index 2a3db152b7..ebf68b9324 100644 --- a/src/Marten/Events/EventStore.FetchForWriting.cs +++ b/src/Marten/Events/EventStore.FetchForWriting.cs @@ -207,6 +207,18 @@ public Task> FetchForExclusiveWriting(string key, return plan.FetchForReading(_session, id, cancellation); } + public ValueTask ProjectLatest(Guid id, CancellationToken cancellation = default) where T : class + { + var plan = FindFetchPlan(); + return plan.ProjectLatest(_session, id, cancellation); + } + + public ValueTask ProjectLatest(string id, CancellationToken cancellation = default) where T : class + { + var plan = FindFetchPlan(); + return plan.ProjectLatest(_session, id, cancellation); + } + public Task StreamLatestJson(Guid id, Stream destination, CancellationToken cancellation = default) where T : class { var plan = FindFetchPlan(); @@ -332,6 +344,12 @@ Task> FetchForWriting(DocumentSessionBase session, TId id, lo ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation); + /// + /// Fetch the projected aggregate including any uncommitted events in the session. + /// For inline projections, the updated document is also stored in the session. + /// + ValueTask ProjectLatest(DocumentSessionBase session, TId id, CancellationToken cancellation); + Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation); // These two methods are for batching diff --git a/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs b/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs index 58a212d9f0..b86f9da479 100644 --- a/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs +++ b/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs @@ -59,6 +59,25 @@ internal partial class FetchAsyncPlan return await readLatest(session, id, cancellation, loadHandler, reader, selector).ConfigureAwait(false); } + public async ValueTask ProjectLatest(DocumentSessionBase session, TId id, CancellationToken cancellation) + { + var snapshot = await FetchForReading(session, id, cancellation).ConfigureAwait(false); + + var pendingEvents = FetchPlanHelper.FindPendingEvents(session, id); + if (pendingEvents is not { Count: > 0 }) return snapshot; + + snapshot = await _aggregator.BuildAsync(pendingEvents, session, snapshot, id, _storage, cancellation) + .ConfigureAwait(false); + + // Store the updated document so it persists when the session commits + if (snapshot != null) + { + session.Store(snapshot); + } + + return snapshot; + } + public async Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation) { await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); diff --git a/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs b/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs index 52ab9d553c..02cde3cbc4 100644 --- a/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs +++ b/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs @@ -1,6 +1,9 @@ +using System; using System.IO; using System.Threading; using System.Threading.Tasks; +using JasperFx.Core.Reflection; +using JasperFx.Events.Aggregation; using Marten.Internal.Sessions; using Marten.Internal.Storage; using Marten.Linq.QueryHandlers; @@ -35,6 +38,34 @@ internal partial class FetchInlinedPlan return document; } + public async ValueTask ProjectLatest(DocumentSessionBase session, TId id, CancellationToken cancellation) + { + var snapshot = await FetchForReading(session, id, cancellation).ConfigureAwait(false); + + var pendingEvents = FetchPlanHelper.FindPendingEvents(session, id); + if (pendingEvents is not { Count: > 0 }) return snapshot; + + // Build the aggregator on demand + var raw = session.Options.Projections.AggregatorFor(); + var storage = findDocumentStorage(session); + var aggregator = raw as IAggregator + ?? typeof(IdentityForwardingAggregator<,,,>) + .CloseAndBuildAs>(raw, storage, typeof(TDoc), + storage is IDocumentStorage s ? s.IdType : typeof(TId), + typeof(TId), typeof(IQuerySession)); + + snapshot = await aggregator.BuildAsync(pendingEvents, session, snapshot, id, storage, cancellation) + .ConfigureAwait(false); + + // Store the updated document so it persists when the session commits + if (snapshot != null) + { + session.Store(snapshot); + } + + return snapshot; + } + public async Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation) { var storage = findDocumentStorage(session); diff --git a/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs b/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs index 0c8faac68a..6546b34194 100644 --- a/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs +++ b/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs @@ -45,6 +45,20 @@ internal partial class FetchLivePlan return await _aggregator.BuildAsync(events, session, default, id, _documentStorage, cancellation).ConfigureAwait(false); } + public async ValueTask ProjectLatest(DocumentSessionBase session, TId id, CancellationToken cancellation) + { + var snapshot = await FetchForReading(session, id, cancellation).ConfigureAwait(false); + + var pendingEvents = FetchPlanHelper.FindPendingEvents(session, id); + if (pendingEvents is { Count: > 0 }) + { + snapshot = await _aggregator.BuildAsync(pendingEvents, session, snapshot, id, _documentStorage, cancellation) + .ConfigureAwait(false); + } + + return snapshot; + } + public async Task StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation) { var aggregate = await FetchForReading(session, id, cancellation).ConfigureAwait(false); diff --git a/src/Marten/Events/Fetching/FetchNaturalKeyPlan.cs b/src/Marten/Events/Fetching/FetchNaturalKeyPlan.cs index f760acfe69..fda9e8c13e 100644 --- a/src/Marten/Events/Fetching/FetchNaturalKeyPlan.cs +++ b/src/Marten/Events/Fetching/FetchNaturalKeyPlan.cs @@ -207,6 +207,16 @@ await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait } } + public async ValueTask ProjectLatest(DocumentSessionBase session, TNaturalKey id, + CancellationToken cancellation) + { + // For natural keys, we cannot reliably find pending events because they are + // tracked by stream ID (Guid/string), not by natural key. The natural key mapping + // is typically created by the inline projection itself, which hasn't run yet for + // uncommitted events. Fall back to FetchForReading. + return await FetchForReading(session, id, cancellation).ConfigureAwait(false); + } + public async Task StreamForReading(DocumentSessionBase session, TNaturalKey id, Stream destination, CancellationToken cancellation) { diff --git a/src/Marten/Events/Fetching/FetchPlanHelper.cs b/src/Marten/Events/Fetching/FetchPlanHelper.cs new file mode 100644 index 0000000000..0e6040c32e --- /dev/null +++ b/src/Marten/Events/Fetching/FetchPlanHelper.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using JasperFx.Events; +using Marten.Internal.Sessions; + +namespace Marten.Events.Fetching; + +internal static class FetchPlanHelper +{ + /// + /// Find any events that have been appended to the given stream in the current + /// session but not yet committed via SaveChangesAsync(). + /// + public static IReadOnlyList? FindPendingEvents(DocumentSessionBase session, TId id) + { + if (id is Guid guidId && session.WorkTracker.TryFindStream(guidId, out var guidStream)) + { + return guidStream.Events; + } + + if (id is string stringId && session.WorkTracker.TryFindStream(stringId, out var stringStream)) + { + return stringStream.Events; + } + + return null; + } +} diff --git a/src/Marten/Events/IEventStoreOperations.cs b/src/Marten/Events/IEventStoreOperations.cs index eb564feec0..879e943c82 100644 --- a/src/Marten/Events/IEventStoreOperations.cs +++ b/src/Marten/Events/IEventStoreOperations.cs @@ -382,6 +382,30 @@ Task WriteExclusivelyToAggregate(string id, Func, Task> writi /// ValueTask FetchLatest(string id, CancellationToken cancellation = default) where T : class; + /// + /// Fetch the projected aggregate T by id, including any events appended + /// in this session that have not yet been committed. For inline projections, + /// the updated document is also stored in the session so it will be persisted + /// on the next SaveChangesAsync() call. + /// + /// + /// + /// + /// + ValueTask ProjectLatest(Guid id, CancellationToken cancellation = default) where T : class; + + /// + /// Fetch the projected aggregate T by id, including any events appended + /// in this session that have not yet been committed. For inline projections, + /// the updated document is also stored in the session so it will be persisted + /// on the next SaveChangesAsync() call. + /// + /// + /// + /// + /// + ValueTask ProjectLatest(string id, CancellationToken cancellation = default) where T : class; + /// /// Stream the raw JSON of the projected aggregate T by id directly to a destination stream. /// This avoids any deserialization/serialization round-trip when the aggregate is stored inline or