Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ const config: UserConfig<DefaultTheme.Config> = {
{ text: 'EF Core Projections', link: '/events/projections/efcore' },
{ text: 'Projections and IoC Services', link: '/events/projections/ioc' },
{ text: 'Ancillary Stores in Projections', link: '/events/projections/ancillary-stores' },
{ text: 'ProjectLatest — Include Pending Events', link: '/events/projections/project-latest' },
{ text: 'Async Daemon HealthChecks', link: '/events/projections/healthchecks' },]
},
{
Expand Down
109 changes: 109 additions & 0 deletions docs/events/projections/project-latest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# ProjectLatest — Include Pending Events <Badge type="tip" text="8.x" />

`ProjectLatest<T>()` returns the projected state of an aggregate including any events that have been
appended in the current session but not yet committed. This eliminates the need for a forced
`SaveChangesAsync()` + `FetchLatest()` round-trip when you need the projected result immediately
after appending events.

## Motivation

A common pattern in command handlers looks like this:

```csharp
// Today's pattern: forced flush + re-read
session.Events.StartStream<Report>(id, new ReportCreated("Q1"));
await session.SaveChangesAsync(ct); // forced flush
var report = await session.Events.FetchLatest<Report>(id, ct); // re-read
return report;
```

With `ProjectLatest`, this becomes:

```csharp
// Better: project locally including pending events
session.Events.StartStream<Report>(id, new ReportCreated("Q1"));
var report = await session.Events.ProjectLatest<Report>(id, ct);
// SaveChangesAsync happens later (e.g., Wolverine AutoApplyTransactions)
return report;
```

## API

```csharp
// On IDocumentSession.Events (IEventStoreOperations)
ValueTask<T?> ProjectLatest<T>(Guid id, CancellationToken cancellation = default);
ValueTask<T?> ProjectLatest<T>(string id, CancellationToken cancellation = default);
```

## Behavior by Projection Lifecycle

### Live Projections

1. Fetches all committed events from the database and builds the aggregate
2. Finds any pending (uncommitted) events for that stream in the current session
3. Applies the pending events on top of the committed state
4. Returns the result (no storage — live projections are ephemeral)

### Inline Projections

1. Loads the pre-projected document from the database
2. Finds any pending events for that stream in the current session
3. Applies the pending events on top using the aggregate's Apply/Create methods
4. **Stores the updated document in the session** so it will be persisted on the next `SaveChangesAsync()`
5. Returns the result

### Async Projections

Same behavior as inline: loads the stored document, applies pending events, stores the updated
document in the session.

## Example

```csharp
public record ReportCreated(string Title);
public record SectionAdded(string SectionName);
public record ReportPublished;

public class Report
{
public Guid Id { get; set; }
public string Title { get; set; } = "";
public int SectionCount { get; set; }
public bool IsPublished { get; set; }

public static Report Create(ReportCreated e) => new Report { Title = e.Title };
public void Apply(SectionAdded e) => SectionCount++;
public void Apply(ReportPublished e) => IsPublished = true;
}

// In a command handler:
await using var session = store.LightweightSession();

session.Events.StartStream(streamId,
new ReportCreated("Q1 Report"),
new SectionAdded("Revenue"),
new SectionAdded("Costs")
);

// Get the projected state WITHOUT saving first
var report = await session.Events.ProjectLatest<Report>(streamId);

// report.Title == "Q1 Report"
// report.SectionCount == 2
// report.IsPublished == false

// Save happens later — the inline document is already queued for storage
await session.SaveChangesAsync();
```

## When No Pending Events Exist

If there are no uncommitted events for the given stream in the session, `ProjectLatest` behaves
identically to `FetchLatest` — it returns the current committed state.

## Limitations

- **Natural key projections**: `ProjectLatest` with a natural key ID falls back to `FetchLatest`
because the natural key mapping may not exist yet for uncommitted streams.
- **Read-only sessions**: `ProjectLatest` is only available on `IDocumentSession.Events`
(not `IQuerySession.Events`) because it may store the updated document for inline projections.
232 changes: 232 additions & 0 deletions src/EventSourcingTests/Projections/project_latest_tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
using System;
using System.Linq;
using System.Threading.Tasks;
using JasperFx.Events;
using JasperFx.Events.Projections;
using Marten;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Projections;

#region sample_project_latest_events_and_aggregate

public record ReportCreated(string Title);
public record SectionAdded(string SectionName);
public record ReportPublished;

public class Report
{
public Guid Id { get; set; }
public string Title { get; set; } = "";
public int SectionCount { get; set; }
public bool IsPublished { get; set; }

// Self-aggregating methods for LiveStreamAggregation
public static Report Create(ReportCreated e) => new Report { Title = e.Title };
public void Apply(SectionAdded e) => SectionCount++;
public void Apply(ReportPublished e) => IsPublished = true;
}

public class ReportProjection : SingleStreamProjection<Report, Guid>
{
public Report Create(ReportCreated e) => new Report { Title = e.Title };

public void Apply(SectionAdded e, Report report) => report.SectionCount++;

public void Apply(ReportPublished e, Report report) => report.IsPublished = true;
}

#endregion

public class project_latest_tests : OneOffConfigurationsContext
{
[Fact]
public async Task live_projection_includes_pending_events()
{
StoreOptions(opts =>
{
opts.Projections.LiveStreamAggregation<Report>();
});

var streamId = Guid.NewGuid();

await using var session = theStore.LightweightSession();

// Append events but do NOT save
session.Events.StartStream(streamId,
new ReportCreated("Q1 Report"),
new SectionAdded("Revenue"),
new SectionAdded("Costs")
);

// ProjectLatest should include the pending events
var report = await session.Events.ProjectLatest<Report>(streamId);

report.ShouldNotBeNull();
report.Title.ShouldBe("Q1 Report");
report.SectionCount.ShouldBe(2);
}

[Fact]
public async Task inline_projection_includes_pending_events()
{
StoreOptions(opts =>
{
opts.Projections.Add<ReportProjection>(ProjectionLifecycle.Inline);
});

var streamId = Guid.NewGuid();

await using var session = theStore.LightweightSession();

// Append events but do NOT save
session.Events.StartStream(streamId,
new ReportCreated("Q1 Report"),
new SectionAdded("Revenue")
);

// ProjectLatest should include the pending events
var report = await session.Events.ProjectLatest<Report>(streamId);

report.ShouldNotBeNull();
report.Title.ShouldBe("Q1 Report");
report.SectionCount.ShouldBe(1);
}

[Fact]
public async Task inline_projection_stores_document_on_project_latest()
{
StoreOptions(opts =>
{
opts.Projections.Add<ReportProjection>(ProjectionLifecycle.Inline);
});

var streamId = Guid.NewGuid();

// First, save some initial events
await using (var session = theStore.LightweightSession())
{
session.Events.StartStream(streamId,
new ReportCreated("Q1 Report"),
new SectionAdded("Revenue")
);
await session.SaveChangesAsync();
}

// Now open a new session, append more events, and call ProjectLatest
await using (var session = theStore.LightweightSession())
{
session.Events.Append(streamId,
new SectionAdded("Costs"),
new ReportPublished()
);

var report = await session.Events.ProjectLatest<Report>(streamId);

report.ShouldNotBeNull();
report.SectionCount.ShouldBe(2); // Revenue + Costs
report.IsPublished.ShouldBeTrue();

// Now save - the inline-projected document should be updated
await session.SaveChangesAsync();
}

// Verify the document was persisted with the projected state
await using (var query = theStore.QuerySession())
{
var report = await query.LoadAsync<Report>(streamId);
report.ShouldNotBeNull();
report.SectionCount.ShouldBe(2);
report.IsPublished.ShouldBeTrue();
}
}

[Fact]
public async Task no_pending_events_behaves_like_fetch_latest()
{
StoreOptions(opts =>
{
opts.Projections.Add<ReportProjection>(ProjectionLifecycle.Inline);
});

var streamId = Guid.NewGuid();

await using (var session = theStore.LightweightSession())
{
session.Events.StartStream(streamId,
new ReportCreated("Q1 Report"),
new SectionAdded("Revenue")
);
await session.SaveChangesAsync();
}

// No pending events in this session
await using (var session = theStore.LightweightSession())
{
var fromProjectLatest = await session.Events.ProjectLatest<Report>(streamId);
var fromFetchLatest = await session.Events.FetchLatest<Report>(streamId);

fromProjectLatest.ShouldNotBeNull();
fromFetchLatest.ShouldNotBeNull();
fromProjectLatest.Title.ShouldBe(fromFetchLatest.Title);
fromProjectLatest.SectionCount.ShouldBe(fromFetchLatest.SectionCount);
}
}

[Fact]
public async Task live_projection_with_committed_and_pending_events()
{
StoreOptions(opts =>
{
opts.Projections.LiveStreamAggregation<Report>();
});

var streamId = Guid.NewGuid();

// First commit some events
await using (var session = theStore.LightweightSession())
{
session.Events.StartStream(streamId,
new ReportCreated("Q1 Report"),
new SectionAdded("Revenue")
);
await session.SaveChangesAsync();
}

// Now append more without saving and project
await using (var session = theStore.LightweightSession())
{
session.Events.Append(streamId,
new SectionAdded("Costs"),
new SectionAdded("Outlook"),
new ReportPublished()
);

var report = await session.Events.ProjectLatest<Report>(streamId);

report.ShouldNotBeNull();
report.Title.ShouldBe("Q1 Report");
report.SectionCount.ShouldBe(3); // Revenue + Costs + Outlook
report.IsPublished.ShouldBeTrue();
}
}

[Fact]
public async Task returns_null_for_nonexistent_stream_with_no_pending_events()
{
StoreOptions(opts =>
{
opts.Projections.LiveStreamAggregation<Report>();
});

await using var session = theStore.LightweightSession();

var report = await session.Events.ProjectLatest<Report>(Guid.NewGuid());
report.ShouldBeNull();
}
}
18 changes: 18 additions & 0 deletions src/Marten/Events/EventStore.FetchForWriting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,18 @@ public Task<IEventStream<T>> FetchForExclusiveWriting<T>(string key,
return plan.FetchForReading(_session, id, cancellation);
}

public ValueTask<T?> ProjectLatest<T>(Guid id, CancellationToken cancellation = default) where T : class
{
var plan = FindFetchPlan<T, Guid>();
return plan.ProjectLatest(_session, id, cancellation);
}

public ValueTask<T?> ProjectLatest<T>(string id, CancellationToken cancellation = default) where T : class
{
var plan = FindFetchPlan<T, string>();
return plan.ProjectLatest(_session, id, cancellation);
}

public Task<bool> StreamLatestJson<T>(Guid id, Stream destination, CancellationToken cancellation = default) where T : class
{
var plan = FindFetchPlan<T, Guid>();
Expand Down Expand Up @@ -332,6 +344,12 @@ Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase session, TId id, lo

ValueTask<TDoc?> FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation);

/// <summary>
/// Fetch the projected aggregate including any uncommitted events in the session.
/// For inline projections, the updated document is also stored in the session.
/// </summary>
ValueTask<TDoc?> ProjectLatest(DocumentSessionBase session, TId id, CancellationToken cancellation);

Task<bool> StreamForReading(DocumentSessionBase session, TId id, Stream destination, CancellationToken cancellation);

// These two methods are for batching
Expand Down
Loading
Loading