Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions src/EventTests/Projections/SliceGroupTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,115 @@ public void try_find_upstream_cache_walks_past_non_matching_upstream_executions(
cache.ShouldBeSameAs(perTenantCache);
}

[Fact]
public async Task for_entity_ids_fans_out_one_event_to_many_references()
{
// One event references multiple users by name. AddReferences should add a
// References<User> for each id present in the cache, in order.
var sliceId = Guid.NewGuid();
theGroup.Slices[sliceId] = new EventSlice<LetterCounts, Guid>(sliceId, StorageConstants.DefaultTenantId,
new[] { Event.For(new MultipleAssigned(["Bill", "Tom", "Todd"])) });

theUsers["Bill"] = new User("Bill", "William");
theUsers["Tom"] = new User("Tom", "Thomas");
theUsers["Todd"] = new User("Todd", "Todd");

await theGroup.EnrichWith<User>()
.ForEvent<MultipleAssigned>()
.ForEntityIds(e => e.UserNames)
.AddReferences();

var refs = theGroup.Slices[sliceId].Events().OfType<IEvent<References<User>>>()
.Select(x => x.Data.Entity.UserName)
.ToArray();
refs.ShouldBe(["Bill", "Tom", "Todd"]);
}

[Fact]
public async Task for_entity_ids_skips_missing_ids()
{
// Only "Bill" exists; "Ghost" is not in the storage. AddReferences emits
// only the matched entity, not a placeholder for the miss.
var sliceId = Guid.NewGuid();
theGroup.Slices[sliceId] = new EventSlice<LetterCounts, Guid>(sliceId, StorageConstants.DefaultTenantId,
new[] { Event.For(new MultipleAssigned(["Bill", "Ghost"])) });

theUsers["Bill"] = new User("Bill", "William");

await theGroup.EnrichWith<User>()
.ForEvent<MultipleAssigned>()
.ForEntityIds(e => e.UserNames)
.AddReferences();

var refs = theGroup.Slices[sliceId].Events().OfType<IEvent<References<User>>>()
.Select(x => x.Data.Entity.UserName)
.ToArray();
refs.ShouldBe(["Bill"]);
}

[Fact]
public async Task for_entity_ids_dedupes_loads_for_same_id_across_events()
{
// Two events in the same slice both reference "Bill". The storage layer should
// see only one load for "Bill" — we dedupe ids before calling LoadManyAsync.
var sliceId = Guid.NewGuid();
theGroup.Slices[sliceId] = new EventSlice<LetterCounts, Guid>(sliceId, StorageConstants.DefaultTenantId,
new[]
{
Event.For(new MultipleAssigned(["Bill", "Tom"])),
Event.For(new MultipleAssigned(["Bill", "Todd"]))
});

theUsers["Bill"] = new User("Bill", "William");
theUsers["Tom"] = new User("Tom", "Thomas");
theUsers["Todd"] = new User("Todd", "Todd");

await theGroup.EnrichWith<User>()
.ForEvent<MultipleAssigned>()
.ForEntityIds(e => e.UserNames)
.AddReferences();

_loadedUserIds.OrderBy(x => x).ShouldBe(["Bill", "Todd", "Tom"]);
}

[Fact]
public async Task for_entity_ids_returns_nullo_cache_when_no_ids_extracted()
{
// No events of the target type → FetchEntitiesAsync returns NulloAggregateCache
// and never touches storage.
var sliceId = Guid.NewGuid();
theGroup.Slices[sliceId] = new EventSlice<LetterCounts, Guid>(sliceId, StorageConstants.DefaultTenantId,
new[] { Event.For(new MultipleAssigned([])) });

var cache = await theGroup.EnrichWith<User>()
.ForEvent<MultipleAssigned>()
.ForEntityIds(e => e.UserNames)
.FetchEntitiesAsync();

cache.ShouldBeOfType<NulloAggregateCache<string, User>>();
}

[Fact]
public async Task for_entity_ids_enrich_async_invokes_callback_per_resolved_entity()
{
// EnrichAsync fires once per (slice, event, resolved entity) — for [Bill, Ghost, Tom],
// the callback is invoked twice (Ghost is missing).
var sliceId = Guid.NewGuid();
theGroup.Slices[sliceId] = new EventSlice<LetterCounts, Guid>(sliceId, StorageConstants.DefaultTenantId,
new[] { Event.For(new MultipleAssigned(["Bill", "Ghost", "Tom"])) });

theUsers["Bill"] = new User("Bill", "William");
theUsers["Tom"] = new User("Tom", "Thomas");

var seen = new List<string>();
await theGroup.EnrichWith<User>()
.ForEvent<MultipleAssigned>()
.ForEntityIds(e => e.UserNames)
.EnrichAsync((_, _, user) => seen.Add(user.UserName));

seen.ShouldBe(["Bill", "Tom"]);
}

[Fact]
public async Task enrich_with_using_entity_query()
{
Expand Down Expand Up @@ -453,3 +562,5 @@ ValueTask<IMessageSink> IStorageOperations.GetOrStartMessageSink()
}
}

public record MultipleAssigned(string[] UserNames);

106 changes: 106 additions & 0 deletions src/JasperFx.Events/Grouping/SliceGroup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,31 @@ public IdentityStep<TEntity, TEvent, TEntityId> ForEntityIdFromEvent<TEntityId>(
Func<IEvent<TEvent>, TEntityId> identitySource) =>
new(parent, session, (_, e) => identitySource(e), disposeAfterUse);

/// <summary>
/// Specify *how* to extract a collection of identities of type <typeparamref name="TEntityId"/>
/// for entities of type <typeparamref name="TEntity"/> from each event of type
/// <typeparamref name="TEvent"/>. Use this for fan-out enrichment where a single event references
/// multiple entities of the same type — for example an event carrying a list of foreign-key ids.
/// </summary>
/// <param name="identitiesSource">
/// Returns the entity ids referenced by a single event. May be empty, may contain duplicates
/// (duplicates within an event are passed through to the application callback as-is so callers
/// retain control; ids are de-duplicated when fetching from storage to avoid redundant loads).
/// </param>
/// <typeparam name="TEntityId"></typeparam>
public MultiIdentityStep<TEntity, TEvent, TEntityId> ForEntityIds<TEntityId>(
Func<TEvent, IEnumerable<TEntityId>> identitiesSource) =>
new(parent, session, (_, e) => identitiesSource(e.Data), disposeAfterUse);

/// <summary>
/// Like <see cref="ForEntityIds{TEntityId}(Func{TEvent, IEnumerable{TEntityId}})"/> but the
/// callback receives the wrapper <see cref="IEvent{T}"/> rather than the unwrapped event payload,
/// so it can also reach event headers, sequence, timestamps, etc.
/// </summary>
public MultiIdentityStep<TEntity, TEvent, TEntityId> ForEntityIdsFromEvent<TEntityId>(
Func<IEvent<TEvent>, IEnumerable<TEntityId>> identitiesSource) =>
new(parent, session, (_, e) => identitiesSource(e), disposeAfterUse);

/// <summary>
/// Use the containing stream id as the entity identity for every event of type <typeparamref name="TEvent"/>.
/// </summary>
Expand Down Expand Up @@ -571,6 +596,87 @@ internal async Task<IAggregateCache<TEntityId, TEntity>> FetchEntitiesAsync()
}
}

/// <summary>
/// Fan-out variant of <see cref="IdentityStep{TEntity, TEvent, TEntityId}"/>: each event of type
/// <typeparamref name="TEvent"/> maps to zero or more entity identities of type
/// <typeparamref name="TEntityId"/>. The application callback is invoked once per (slice, event,
/// resolved-entity) triple for entities found in the upstream cache or loaded from storage.
/// </summary>
public class MultiIdentityStep<TEntity, TEvent, TEntityId>(
SliceGroup<TDoc, TId> parent,
IStorageOperations session,
Func<EventSlice<TDoc, TId>, IEvent<TEvent>, IEnumerable<TEntityId>> identitiesSource,
bool disposeAfterUse = false) where TEvent : notnull
{
public async Task EnrichAsync(
Action<EventSlice<TDoc, TId>, IEvent<TEvent>, TEntity> application)
{
var cache = await FetchEntitiesAsync();

foreach (EventSlice<TDoc, TId> eventSlice in parent.Slices)
{
var events = eventSlice.Events().OfType<IEvent<TEvent>>().ToArray();
foreach (var @event in events)
{
foreach (var id in identitiesSource(eventSlice, @event))
{
if (cache.TryFind(id, out var entity))
{
application(eventSlice, @event, entity);
}
}
}
}
}

public Task AddReferences()
{
return EnrichAsync((slice, _, entity) => slice.Reference(entity));
}

internal async Task<IAggregateCache<TEntityId, TEntity>> FetchEntitiesAsync()
{
try
{
var cache = parent.findCache<TEntityId, TEntity>();

var storage = await session.FetchProjectionStorageAsync<TEntity, TEntityId>(parent.TenantId, CancellationToken.None);
var ids = parent.Slices
.SelectMany(slice => slice.Events().OfType<IEvent<TEvent>>()
.SelectMany(@event => identitiesSource(slice, @event)))
.Distinct()
.ToArray();

if (!ids.Any())
{
return new NulloAggregateCache<TEntityId, TEntity>();
}

if (cache == null)
{
var dict = await storage.LoadManyAsync(ids, CancellationToken.None);
return new DictionaryAggregateCache<TEntityId, TEntity>(dict);
}

var toLoad = ids.Where(id => !cache.Contains(id)).ToArray();
if (!toLoad.Any()) return cache;

var loaded = await storage.LoadManyAsync(toLoad, CancellationToken.None);

foreach (var pair in loaded)
{
cache.Store(pair.Key, pair.Value);
}

return cache;
}
finally
{
if (disposeAfterUse) await session.DisposeAsync();
}
}
}

public class MultiEventIdentityStep<TEntity, TEntityId>(
SliceGroup<TDoc, TId> parent,
IStorageOperations session,
Expand Down
Loading