Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<PackageVersion Include="FSharp.Core" Version="9.0.100" />
<PackageVersion Include="FSharp.SystemTextJson" Version="1.3.13" />
<PackageVersion Include="JasperFx" Version="1.23.0" />
<PackageVersion Include="JasperFx.Events" Version="1.25.0" />
<PackageVersion Include="JasperFx.Events" Version="1.26.0" />
<PackageVersion Include="JasperFx.Events.SourceGenerator" Version="1.4.0" />
<PackageVersion Include="JasperFx.RuntimeCompiler" Version="4.4.0" />
<PackageVersion Include="Jil" Version="3.0.0-alpha2" />
Expand Down
83 changes: 83 additions & 0 deletions docs/events/projections/event-projections.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,86 @@ public partial class TrackedEventProjection: EventProjection
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Examples/TrackedEventProjection.cs#L32-L80' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_enable_document_tracking_in_event_projection' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Event Enrichment <Badge type="tip" text="8.29" />

`EventProjection` supports an `EnrichEventsAsync` hook that runs **before** individual events
are processed. This allows you to batch-load reference data from the database and enrich events
with it, avoiding N+1 query problems.

This is the same pattern available on [aggregation projections](/events/projections/enrichment),
now extended to `EventProjection`.

### Basic Usage

Override `EnrichEventsAsync` in your `EventProjection` subclass:

```cs
public class TaskSummaryProjection : EventProjection
{
public TaskSummaryProjection()
{
// The Project handler reads UserName that was set by EnrichEventsAsync
Project<TaskAssigned>((e, ops) =>
{
ops.Store(new TaskSummary
{
Id = e.TaskId,
AssignedUserName = e.UserName
});
});
}

public override async Task EnrichEventsAsync(
IQuerySession querySession,
IReadOnlyList<IEvent> events,
CancellationToken cancellation)
{
// 1. Find events that need enrichment
var assigned = events
.OfType<IEvent<TaskAssigned>>()
.ToArray();

if (assigned.Length == 0) return;

// 2. Batch-load reference data (one query, not N queries)
var userIds = assigned
.Select(e => e.Data.UserId)
.Distinct()
.ToArray();

var users = await querySession
.LoadManyAsync<User>(cancellation, userIds);
var lookup = users.ToDictionary(u => u.Id);

// 3. Set enriched properties on event data
foreach (var e in assigned)
{
if (lookup.TryGetValue(e.Data.UserId, out var user))
{
e.Data.UserName = $"{user.FirstName} {user.LastName}";
}
}
}
}
```

### How It Works

* `EnrichEventsAsync` is called **once per tenant batch** before any individual event handlers run
* The `querySession` parameter provides read access to the database for loading reference data
* Modifications to `e.Data` properties are visible to `Project<T>` and `ProjectAsync<T>` handlers
* Works with both **Inline** and **Async** projection lifecycles
* The method has a no-op default implementation -- only override it when you need enrichment

### When to Use

Use `EnrichEventsAsync` when your `EventProjection` handlers need data that isn't in the
event itself. Common scenarios:

* Looking up user names, product details, or other reference data by ID
* Resolving business keys to internal identifiers
* Loading configuration or lookup tables needed during projection

Without enrichment, each handler would need to load this data individually, resulting in
N+1 database queries when processing a batch of events.
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Events;
using JasperFx.Events.Projections;
using Marten;
using Marten.Events.Projections;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Projections;

public class event_projection_enrichment_tests : OneOffConfigurationsContext
{
[Fact]
public async Task enrichment_sets_data_before_apply_inline()
{
StoreOptions(opts =>
{
opts.Projections.Add(new SimpleEnrichmentProjection(), ProjectionLifecycle.Inline);
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

// Use TaskAssigned as the only event — enrichment sets UserName,
// ProjectAsync reads it and stores a TaskSummary
var taskId = Guid.NewGuid();
await using (var session = theStore.LightweightSession())
{
session.Events.StartStream(taskId,
new TaskAssigned { TaskId = taskId, UserId = Guid.NewGuid() });
await session.SaveChangesAsync();
}

await using (var query = theStore.QuerySession())
{
var summary = await query.LoadAsync<TaskSummary>(taskId);
summary.ShouldNotBeNull();
// The enrichment hardcodes the name — if set, enrichment ran before Apply
summary.AssignedUserName.ShouldBe("Enriched User");
}
}

[Fact]
public async Task enrichment_with_database_lookup_inline()
{
StoreOptions(opts =>
{
opts.Projections.Add(new DatabaseLookupEnrichmentProjection(), ProjectionLifecycle.Inline);
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

// Pre-store a User
var userId = Guid.NewGuid();
await using (var session = theStore.LightweightSession())
{
session.Store(new User { Id = userId, FirstName = "Alice", LastName = "Smith" });
await session.SaveChangesAsync();
}

var taskId = Guid.NewGuid();
await using (var session = theStore.LightweightSession())
{
session.Events.StartStream(taskId,
new TaskAssigned { TaskId = taskId, UserId = userId });
await session.SaveChangesAsync();
}

await using (var query = theStore.QuerySession())
{
var summary = await query.LoadAsync<TaskSummary>(taskId);
summary.ShouldNotBeNull();
summary.AssignedUserName.ShouldBe("Alice Smith");
}
}

[Fact]
public async Task enrichment_is_called_before_apply()
{
var callOrder = new List<string>();

StoreOptions(opts =>
{
opts.Projections.Add(
new CallOrderTrackingProjection(callOrder),
ProjectionLifecycle.Inline);
});

await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync();

var streamId = Guid.NewGuid();
await using var session = theStore.LightweightSession();
session.Events.StartStream(streamId, new TaskCreated { TaskId = streamId, Title = "Test" });
await session.SaveChangesAsync();

callOrder.ShouldBe(new[] { "EnrichEventsAsync", "Apply:TaskCreated" });
}
}

#region Test Events

public class TaskCreated
{
public Guid TaskId { get; set; }
public string Title { get; set; } = "";
}

public class TaskAssigned
{
public Guid TaskId { get; set; }
public Guid UserId { get; set; }
public string? UserName { get; set; }
}

#endregion

#region Test Documents

public class TaskSummary
{
public Guid Id { get; set; }
public string Title { get; set; } = "";
public string? AssignedUserName { get; set; }
}

#endregion

#region Simple Enrichment (no DB lookup)

public class SimpleEnrichmentProjection : EventProjection
{
public SimpleEnrichmentProjection()
{
// TaskAssigned handler reads UserName that was set by EnrichEventsAsync
Project<TaskAssigned>((e, ops) =>
{
ops.Store(new TaskSummary
{
Id = e.TaskId,
AssignedUserName = e.UserName
});
});
}

public override Task EnrichEventsAsync(IQuerySession querySession,
IReadOnlyList<IEvent> events, CancellationToken cancellation)
{
foreach (var e in events.OfType<IEvent<TaskAssigned>>())
{
e.Data.UserName = "Enriched User";
}
return Task.CompletedTask;
}
}

#endregion

#region Database Lookup Enrichment

public class DatabaseLookupEnrichmentProjection : EventProjection
{
public DatabaseLookupEnrichmentProjection()
{
// Stores a TaskSummary using the enriched UserName
Project<TaskAssigned>((e, ops) =>
{
ops.Store(new TaskSummary
{
Id = e.TaskId,
AssignedUserName = e.UserName
});
});
}

public override async Task EnrichEventsAsync(IQuerySession querySession,
IReadOnlyList<IEvent> events, CancellationToken cancellation)
{
var assigned = events.OfType<IEvent<TaskAssigned>>().ToArray();
if (assigned.Length == 0) return;

var userIds = assigned.Select(e => e.Data.UserId).Distinct().ToArray();
var users = await querySession.LoadManyAsync<User>(cancellation, userIds);
var lookup = users.ToDictionary(u => u.Id);

foreach (var e in assigned)
{
if (lookup.TryGetValue(e.Data.UserId, out var user))
{
e.Data.UserName = $"{user.FirstName} {user.LastName}";
}
}
}
}

#endregion

#region Call Order Tracking

public class CallOrderTrackingProjection : EventProjection
{
private readonly List<string> _callOrder;

public CallOrderTrackingProjection(List<string> callOrder)
{
_callOrder = callOrder;

Project<TaskCreated>((e, ops) =>
{
_callOrder.Add($"Apply:{nameof(TaskCreated)}");
});
}

public override Task EnrichEventsAsync(IQuerySession querySession,
IReadOnlyList<IEvent> events, CancellationToken cancellation)
{
_callOrder.Add(nameof(EnrichEventsAsync));
return Task.CompletedTask;
}
}

#endregion
Loading