Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions src/EventTests/Projections/SliceGroupTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public async Task enrich_with_using_entity_query()

await theGroup.EnrichWith<User>()
.ForEvent<Assigned>()
.UsingEntityQuery<string>(async (_, events, ct) =>
.EnrichUsingEntityQuery<string>((slices, events, cache, ct) =>
{
ct.ThrowIfCancellationRequested();

Expand All @@ -187,12 +187,19 @@ await theGroup.EnrichWith<User>()
.Where(theUsers.ContainsKey)
.ToDictionary(x => x, x => theUsers[x]);

return await Task.FromResult<IReadOnlyDictionary<string, User>>(dict);
})
.EnrichAsync(
e => e.Data.UserName,
(slice, e, user) => slice.ReplaceEvent(e, new AssignedToUser(user))
);
foreach (var slice in slices)
{
foreach (var e in slice.Events().OfType<IEvent<Assigned>>())
{
if (dict.TryGetValue(e.Data.UserName, out var user))
{
slice.ReplaceEvent(e, new AssignedToUser(user));
}
}
}

return Task.CompletedTask;
});

theGroup.Slices[id1].Events().OfType<IEvent<AssignedToUser>>().Single().Data.User.UserName.ShouldBe("Bill");
theGroup.Slices[id2].Events().OfType<IEvent<AssignedToUser>>().Single().Data.User.UserName.ShouldBe("Tom");
Expand Down
117 changes: 56 additions & 61 deletions src/JasperFx.Events/Grouping/SliceGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public class EntityStep<TEntity>(SliceGroup<TDoc, TId> parent, IStorageOperation
public EventStep<TEntity, TEvent> ForEvent<TEvent>() => new(parent, session);
}

public class EventStep<TEntity, TEvent>(SliceGroup<TDoc, TId> parent, IStorageOperations session) where TEvent : notnull
public class EventStep<TEntity, TEvent>(SliceGroup<TDoc, TId> parent, IStorageOperations session) where TEvent : notnull where TEntity : notnull
{
/// <summary>
/// Specify *how* the enrichment can find the identity TId from the events of type TEvent for entities
Expand All @@ -218,79 +218,74 @@ public IdentityStep<TEntity, TEvent, TEntityId> ForEntityId<TEntityId>(
public IdentityStep<TEntity, TEvent, TEntityId> ForEntityIdFromEvent<TEntityId>(
Func<IEvent<TEvent>, TEntityId> identitySource) =>
new(parent, session, identitySource);

/// <summary>
/// Configure a custom entity loading step that allows full control over how
/// related entities are fetched from Marten using LINQ or any other query logic.
/// This is intended for more complex loading scenarios than simple identity based
/// lookups, for example filtering on additional fields, loading only active
/// entities, or applying joins and includes.
/// Execute a batched enrichment operation for events of type <typeparamref name="TEvent"/>,
/// allowing the caller to perform a single lookup using any external query mechanism
/// captured in the provided delegate and apply the results directly to the active slices.
/// </summary>
/// <typeparam name="TEntityId">
/// The identifier type used to correlate events to loaded entities.
/// </typeparam>
/// <param name="loader">
/// A function that receives the current <see cref="IStorageOperations"/> instance,
/// the complete set of events of type <typeparamref name="TEvent"/> that occur
/// in the active slices, and a cancellation token.
/// The function is responsible for loading the relevant <typeparamref name="TEntity"/>
/// instances and returning them as a dictionary keyed by <typeparamref name="TEntityId"/>.
/// <remarks>
/// This method is intended for enrichment scenarios where related entities cannot be
/// resolved by a simple identity lookup, or where applying the enrichment requires custom
/// logic beyond a basic reference or replacement.
///
/// The delegate receives all slices and all matching events in the current batch, so it can:
/// query data in one round trip, match the results to events using any correlation key,
/// and then mutate the slices by referencing entities, replacing events, or adding synthetic events.
///
/// An upstream aggregate cache, if available, is passed through to the delegate but is never
/// accessed or mutated by this method. Cache usage is entirely controlled by the caller.
/// </remarks>
/// <param name="enrichment">
/// A delegate invoked once per enrichment execution. The delegate receives:
/// <list type="bullet">
/// <item>
/// <description>
/// The active <see cref="EventSlice{TDoc, TId}"/> instances for this batch
/// </description>
/// </item>
/// <item>
/// <description>
/// The complete set of events of type <typeparamref name="TEvent"/> occurring in those slices
/// </description>
/// </item>
/// <item>
/// <description>
/// An optional upstream aggregate cache for entities that the caller may consult or populate
/// </description>
/// </item>
/// <item>
/// <description>
/// A cancellation token
/// </description>
/// </item>
/// </list>
/// </param>
/// <param name="cancellation">
/// A cancellation token used to cancel the enrichment operation.
/// </param>
/// <returns>
/// A <see cref="QueryStep{TEntity, TEvent, TEntityId}"/> that can be used to apply
/// enrichment logic based on the loaded entities.
/// </returns>
public QueryStep<TEntity, TEvent, TEntityId> UsingEntityQuery<TEntityId>(
public async Task EnrichUsingEntityQuery<TEntityId>(
Func<
IStorageOperations,
IReadOnlyList<EventSlice<TDoc, TId>>,
IReadOnlyList<IEvent<TEvent>>,
IAggregateCache<TEntityId, TEntity>?,
CancellationToken,
Task<IReadOnlyDictionary<TEntityId, TEntity>>> loader)
where TEntityId : notnull =>
new(parent, session, loader);
}

public class QueryStep<TEntity, TEvent, TEntityId>(
SliceGroup<TDoc, TId> parent,
IStorageOperations session,
Func<IStorageOperations, IReadOnlyList<IEvent<TEvent>>, CancellationToken, Task<IReadOnlyDictionary<TEntityId, TEntity>>> loader)
where TEntityId : notnull where TEvent : notnull
{
public async Task EnrichAsync(
Func<IEvent<TEvent>, TEntityId> eventToEntityId,
Action<EventSlice<TDoc, TId>, IEvent<TEvent>, TEntity> application,
CancellationToken ct = default)
Task> enrichment,
CancellationToken cancellation = default)
where TEntityId : notnull
{
var allEvents = parent.Slices
var slices = parent.Slices.ToList();

var events = slices
.SelectMany(x => x.Events())
.OfType<IEvent<TEvent>>()
.ToArray();
.ToList();

if (allEvents.Length == 0) return;
if (events.Count == 0) return;

var cache = parent.findCache<TEntityId, TEntity>();
var dict = await loader(session, allEvents, ct);

foreach (var slice in parent.Slices)
{
var events = slice.Events().OfType<IEvent<TEvent>>().ToArray();
foreach (var e in events)
{
var id = eventToEntityId(e);

if (cache != null && cache.TryFind(id, out var cachedEntity))
{
application(slice, e, cachedEntity);
continue;
}

if (dict.TryGetValue(id, out var entity))
{
cache?.Store(id, entity);
application(slice, e, entity);
}
}
}
await enrichment(slices, events, cache, cancellation);
}
}

Expand Down
Loading