diff --git a/src/EventTests/Projections/SliceGroupTests.cs b/src/EventTests/Projections/SliceGroupTests.cs index 8437798..77775aa 100644 --- a/src/EventTests/Projections/SliceGroupTests.cs +++ b/src/EventTests/Projections/SliceGroupTests.cs @@ -308,6 +308,115 @@ public void try_find_upstream_cache_walks_past_non_matching_upstream_executions( cache.ShouldBeSameAs(perTenantCache); } + [Fact] + public async Task for_entity_ids_fans_out_one_event_to_many_references() + { + // One event references multiple users by name. AddReferences should add a + // References for each id present in the cache, in order. + var sliceId = Guid.NewGuid(); + theGroup.Slices[sliceId] = new EventSlice(sliceId, StorageConstants.DefaultTenantId, + new[] { Event.For(new MultipleAssigned(["Bill", "Tom", "Todd"])) }); + + theUsers["Bill"] = new User("Bill", "William"); + theUsers["Tom"] = new User("Tom", "Thomas"); + theUsers["Todd"] = new User("Todd", "Todd"); + + await theGroup.EnrichWith() + .ForEvent() + .ForEntityIds(e => e.UserNames) + .AddReferences(); + + var refs = theGroup.Slices[sliceId].Events().OfType>>() + .Select(x => x.Data.Entity.UserName) + .ToArray(); + refs.ShouldBe(["Bill", "Tom", "Todd"]); + } + + [Fact] + public async Task for_entity_ids_skips_missing_ids() + { + // Only "Bill" exists; "Ghost" is not in the storage. AddReferences emits + // only the matched entity, not a placeholder for the miss. + var sliceId = Guid.NewGuid(); + theGroup.Slices[sliceId] = new EventSlice(sliceId, StorageConstants.DefaultTenantId, + new[] { Event.For(new MultipleAssigned(["Bill", "Ghost"])) }); + + theUsers["Bill"] = new User("Bill", "William"); + + await theGroup.EnrichWith() + .ForEvent() + .ForEntityIds(e => e.UserNames) + .AddReferences(); + + var refs = theGroup.Slices[sliceId].Events().OfType>>() + .Select(x => x.Data.Entity.UserName) + .ToArray(); + refs.ShouldBe(["Bill"]); + } + + [Fact] + public async Task for_entity_ids_dedupes_loads_for_same_id_across_events() + { + // Two events in the same slice both reference "Bill". The storage layer should + // see only one load for "Bill" — we dedupe ids before calling LoadManyAsync. + var sliceId = Guid.NewGuid(); + theGroup.Slices[sliceId] = new EventSlice(sliceId, StorageConstants.DefaultTenantId, + new[] + { + Event.For(new MultipleAssigned(["Bill", "Tom"])), + Event.For(new MultipleAssigned(["Bill", "Todd"])) + }); + + theUsers["Bill"] = new User("Bill", "William"); + theUsers["Tom"] = new User("Tom", "Thomas"); + theUsers["Todd"] = new User("Todd", "Todd"); + + await theGroup.EnrichWith() + .ForEvent() + .ForEntityIds(e => e.UserNames) + .AddReferences(); + + _loadedUserIds.OrderBy(x => x).ShouldBe(["Bill", "Todd", "Tom"]); + } + + [Fact] + public async Task for_entity_ids_returns_nullo_cache_when_no_ids_extracted() + { + // No events of the target type → FetchEntitiesAsync returns NulloAggregateCache + // and never touches storage. + var sliceId = Guid.NewGuid(); + theGroup.Slices[sliceId] = new EventSlice(sliceId, StorageConstants.DefaultTenantId, + new[] { Event.For(new MultipleAssigned([])) }); + + var cache = await theGroup.EnrichWith() + .ForEvent() + .ForEntityIds(e => e.UserNames) + .FetchEntitiesAsync(); + + cache.ShouldBeOfType>(); + } + + [Fact] + public async Task for_entity_ids_enrich_async_invokes_callback_per_resolved_entity() + { + // EnrichAsync fires once per (slice, event, resolved entity) — for [Bill, Ghost, Tom], + // the callback is invoked twice (Ghost is missing). + var sliceId = Guid.NewGuid(); + theGroup.Slices[sliceId] = new EventSlice(sliceId, StorageConstants.DefaultTenantId, + new[] { Event.For(new MultipleAssigned(["Bill", "Ghost", "Tom"])) }); + + theUsers["Bill"] = new User("Bill", "William"); + theUsers["Tom"] = new User("Tom", "Thomas"); + + var seen = new List(); + await theGroup.EnrichWith() + .ForEvent() + .ForEntityIds(e => e.UserNames) + .EnrichAsync((_, _, user) => seen.Add(user.UserName)); + + seen.ShouldBe(["Bill", "Tom"]); + } + [Fact] public async Task enrich_with_using_entity_query() { @@ -453,3 +562,5 @@ ValueTask IStorageOperations.GetOrStartMessageSink() } } +public record MultipleAssigned(string[] UserNames); + diff --git a/src/JasperFx.Events/Grouping/SliceGroup.cs b/src/JasperFx.Events/Grouping/SliceGroup.cs index 9f252d0..bf2f220 100644 --- a/src/JasperFx.Events/Grouping/SliceGroup.cs +++ b/src/JasperFx.Events/Grouping/SliceGroup.cs @@ -423,6 +423,31 @@ public IdentityStep ForEntityIdFromEvent( Func, TEntityId> identitySource) => new(parent, session, (_, e) => identitySource(e), disposeAfterUse); + /// + /// Specify *how* to extract a collection of identities of type + /// for entities of type from each event of type + /// . Use this for fan-out enrichment where a single event references + /// multiple entities of the same type — for example an event carrying a list of foreign-key ids. + /// + /// + /// Returns the entity ids referenced by a single event. May be empty, may contain duplicates + /// (duplicates within an event are passed through to the application callback as-is so callers + /// retain control; ids are de-duplicated when fetching from storage to avoid redundant loads). + /// + /// + public MultiIdentityStep ForEntityIds( + Func> identitiesSource) => + new(parent, session, (_, e) => identitiesSource(e.Data), disposeAfterUse); + + /// + /// Like but the + /// callback receives the wrapper rather than the unwrapped event payload, + /// so it can also reach event headers, sequence, timestamps, etc. + /// + public MultiIdentityStep ForEntityIdsFromEvent( + Func, IEnumerable> identitiesSource) => + new(parent, session, (_, e) => identitiesSource(e), disposeAfterUse); + /// /// Use the containing stream id as the entity identity for every event of type . /// @@ -571,6 +596,87 @@ internal async Task> FetchEntitiesAsync() } } + /// + /// Fan-out variant of : each event of type + /// maps to zero or more entity identities of type + /// . The application callback is invoked once per (slice, event, + /// resolved-entity) triple for entities found in the upstream cache or loaded from storage. + /// + public class MultiIdentityStep( + SliceGroup parent, + IStorageOperations session, + Func, IEvent, IEnumerable> identitiesSource, + bool disposeAfterUse = false) where TEvent : notnull + { + public async Task EnrichAsync( + Action, IEvent, TEntity> application) + { + var cache = await FetchEntitiesAsync(); + + foreach (EventSlice eventSlice in parent.Slices) + { + var events = eventSlice.Events().OfType>().ToArray(); + foreach (var @event in events) + { + foreach (var id in identitiesSource(eventSlice, @event)) + { + if (cache.TryFind(id, out var entity)) + { + application(eventSlice, @event, entity); + } + } + } + } + } + + public Task AddReferences() + { + return EnrichAsync((slice, _, entity) => slice.Reference(entity)); + } + + internal async Task> FetchEntitiesAsync() + { + try + { + var cache = parent.findCache(); + + var storage = await session.FetchProjectionStorageAsync(parent.TenantId, CancellationToken.None); + var ids = parent.Slices + .SelectMany(slice => slice.Events().OfType>() + .SelectMany(@event => identitiesSource(slice, @event))) + .Distinct() + .ToArray(); + + if (!ids.Any()) + { + return new NulloAggregateCache(); + } + + if (cache == null) + { + var dict = await storage.LoadManyAsync(ids, CancellationToken.None); + return new DictionaryAggregateCache(dict); + } + + var toLoad = ids.Where(id => !cache.Contains(id)).ToArray(); + if (!toLoad.Any()) return cache; + + var loaded = await storage.LoadManyAsync(toLoad, CancellationToken.None); + + foreach (var pair in loaded) + { + cache.Store(pair.Key, pair.Value); + } + + return cache; + } + finally + { + if (disposeAfterUse) await session.DisposeAsync(); + } + } + } + public class MultiEventIdentityStep( SliceGroup parent, IStorageOperations session,