diff --git a/Directory.Packages.props b/Directory.Packages.props index 9c4054cdbf..68756d7b32 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -14,7 +14,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/docs/events/projections/composite.md b/docs/events/projections/composite.md index 85d787d021..0a3af914b6 100644 --- a/docs/events/projections/composite.md +++ b/docs/events/projections/composite.md @@ -467,6 +467,9 @@ Marten provides four supported ways for a downstream stage to consume upstream s * **`EnrichWith().ForEvent().ForEntityId(...).AddReferences()`** (and the related `EnrichAsync` overloads). These walk the upstream's in-memory aggregate cache for `T` rather than the database, so they observe in-flight writes from earlier stages in the same batch. + Use **`ForEntityIds`** instead of `ForEntityId` + when a single event references several entities of the same type — see + [Fan-out enrichment](/events/projections/enrichment#fan-out-enrichment-with-forentityids). * **`group.TryFindUpstreamCache(out var cache)`** for custom enrichment callbacks (notably inside `EnrichUsingEntityQuery`) that need to look up an in-flight upstream entity by id when it isn't the type of the enclosing `EnrichWith`. Returns `false` when no @@ -478,6 +481,13 @@ Direct use of `querySession.Query()` from inside `EnrichEventsAsync` is appro **static reference data committed in earlier batches** (for example the `RoutingReason` example above) and not for documents produced by upstream stages of the *current* batch. +::: tip JasperFx.Events 1.35 +Per-projection aggregate caches are no longer compacted between stages of a composite — each +stage's cache is kept at full size for the entire composite batch and trimmed as a unit at the +composite boundary. `Options.CacheLimitPerTenant` is therefore a memory tunable again, not a +correctness lever for downstream `EnrichWith` / `TryFindUpstreamCache` lookups. +::: + ### Looking up arbitrary upstream entities in EnrichUsingEntityQuery `EnrichUsingEntityQuery`'s callback receives a cache parameter typed for the enclosing @@ -490,10 +500,9 @@ captured `SliceGroup` to reach into the upstream stage's in-memory aggregate cac `TryFindUpstreamCache` returns `false` when no upstream stage of this composite is registered as -producing entities of that type, and the cache it returns is a hint — `IAggregateCache.TryFind` may -still miss for entities outside the cache window (`Options.CacheLimitPerTenant`), in which case -the caller should fall back to whatever is appropriate for that data (a SQL query for committed -reference data, an `Updated` event already in the slice, etc.). +producing entities of that type. The cache itself is kept at full size for the duration of the +composite batch, so any entity the upstream just produced is reachable by id regardless of +`Options.CacheLimitPerTenant`. ## Things to Know About Composite Projections diff --git a/docs/events/projections/enrichment.md b/docs/events/projections/enrichment.md index 1e75d41a06..9fa8ad2d25 100644 --- a/docs/events/projections/enrichment.md +++ b/docs/events/projections/enrichment.md @@ -425,6 +425,52 @@ public override AppointmentDetails Evolve(AppointmentDetails snapshot, Guid id, snippet source | anchor +### Fan-out enrichment with ForEntityIds + +`ForEntityId` resolves at most one entity per event. When a single event references *several* +entities of the same type — for example an event payload that carries a list of foreign-key ids, +or a stream-level "this thing changed; here are the related ids" notification — use `ForEntityIds` +(plural) instead. The selector returns an `IEnumerable`, and `AddReferences()` emits +one `References` synthetic event per resolved id. Missing ids are silently skipped, just +as `IdentityStep` already does for unresolved single ids. + + + + +In the projection's `Evolve()`, treat the synthetic events the same way you would for the 1-to-1 +case — one `case References` arm runs once per resolved entity: + +```cs +public override OrderSummary Evolve(OrderSummary snapshot, Guid id, IEvent e) +{ + switch (e.Data) + { + case OrderPlacedWithLineItems: + snapshot ??= new OrderSummary { Id = id }; + break; + + case References productRef: + snapshot ??= new OrderSummary { Id = id }; + snapshot.Lines.Add(new OrderLineSummary + { + ProductId = productRef.Entity.Id, + Sku = productRef.Entity.Sku, + Name = productRef.Entity.Name, + Price = productRef.Entity.Price + }); + snapshot.Total += productRef.Entity.Price; + break; + } + + return snapshot; +} +``` + +`ForEntityIds` de-duplicates ids before going to storage so the same id is never loaded twice in +a single enrichment, even when several events in the slice mention it. There is also a +`ForEntityIdsFromEvent` overload that hands the `IEvent` wrapper to the selector for +callers that need access to event metadata (headers, sequence, timestamp). + ### Enriching by business keys with EnrichUsingEntityQuery The declarative enrichment APIs shown above work very well when events directly reference a document diff --git a/src/DaemonTests/Composites/Bug_4329_fan_out_and_cache_limit.cs b/src/DaemonTests/Composites/Bug_4329_fan_out_and_cache_limit.cs new file mode 100644 index 0000000000..61e183fd68 --- /dev/null +++ b/src/DaemonTests/Composites/Bug_4329_fan_out_and_cache_limit.cs @@ -0,0 +1,211 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Grouping; +using Marten; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace DaemonTests.Composites; + +// Stage-1 streams represent products. +public record ProductCreated(string Sku, string Name, decimal Price); + +public class Product +{ + public Guid Id { get; set; } + public string Sku { get; set; } = ""; + public string Name { get; set; } = ""; + public decimal Price { get; set; } +} + +public class ProductProjection: SingleStreamProjection +{ + public ProductProjection() + { + // INTENTIONALLY tiny — 1.35.0+ keeps this from leaking into downstream correctness; + // see Bug_4329_fan_out_and_cache_limit.cache_limit_one_does_not_break_downstream_lookups. + Options.CacheLimitPerTenant = 1; + } + + public override Product Evolve(Product snapshot, Guid id, IEvent e) + { + switch (e.Data) + { + case ProductCreated created: + snapshot = new Product + { + Id = id, + Sku = created.Sku, + Name = created.Name, + Price = created.Price + }; + break; + } + + return snapshot; + } +} + +// Stage-2 stream represents an order. OrderPlacedWithLineItems carries the IDs of the +// products on the order — one event with many product references. +public record OrderPlacedWithLineItems(Guid[] ProductIds); + +public class OrderSummary +{ + public Guid Id { get; set; } + public List Lines { get; set; } = []; + public decimal Total { get; set; } +} + +public class OrderLineSummary +{ + public Guid ProductId { get; set; } + public string Sku { get; set; } = ""; + public string Name { get; set; } = ""; + public decimal Price { get; set; } +} + +public class OrderSummaryProjection: MultiStreamProjection +{ + public OrderSummaryProjection() + { + Options.CacheLimitPerTenant = 1000; + Identity>(e => e.StreamId); + } + + #region sample_for_entity_ids_fan_out + + public override async Task EnrichEventsAsync(SliceGroup group, + IQuerySession querySession, CancellationToken cancellation) + { + // OrderPlacedWithLineItems carries an array of ProductIds. ForEntityIds fans out + // a single event to one References per resolved id, regardless of how + // small the upstream's CacheLimitPerTenant is — JasperFx.Events 1.35.0 keeps + // upstream caches at full size for the duration of the composite batch. + await group + .EnrichWith() + .ForEvent() + .ForEntityIds(e => e.ProductIds) + .AddReferences(); + } + + #endregion + + public override OrderSummary Evolve(OrderSummary snapshot, Guid id, IEvent e) + { + switch (e.Data) + { + case OrderPlacedWithLineItems: + snapshot ??= new OrderSummary { Id = id }; + break; + + case References productRef: + snapshot ??= new OrderSummary { Id = id }; + snapshot.Lines.Add(new OrderLineSummary + { + ProductId = productRef.Entity.Id, + Sku = productRef.Entity.Sku, + Name = productRef.Entity.Name, + Price = productRef.Entity.Price + }); + snapshot.Total += productRef.Entity.Price; + break; + } + + return snapshot; + } +} + +public class Bug_4329_fan_out_and_cache_limit: BugIntegrationContext +{ + [Fact] + public async Task fan_out_one_event_to_many_upstream_references() + { + StoreOptions(opts => + { + opts.Projections.CompositeProjectionFor("OrderComposite", projection => + { + projection.Add(); // stage 1 + projection.Add(2); // stage 2 + }); + }); + + // Place 5 products via individual product streams. + var skus = new[] { "A1", "B2", "C3", "D4", "E5" }; + var productIds = new Guid[skus.Length]; + for (var i = 0; i < skus.Length; i++) + { + productIds[i] = Guid.NewGuid(); + theSession.Events.StartStream(productIds[i], + new ProductCreated(skus[i], $"Product {skus[i]}", 10m + i)); + } + + // One order references all 5 products in a single event. + var orderId = Guid.NewGuid(); + theSession.Events.StartStream(orderId, new OrderPlacedWithLineItems(productIds)); + + await theSession.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(30.Seconds()); + + var summary = await theSession.LoadAsync(orderId); + summary.ShouldNotBeNull(); + summary.Lines.Count.ShouldBe(5); + summary.Lines.Select(x => x.Sku).OrderBy(x => x).ShouldBe(skus.OrderBy(x => x)); + summary.Total.ShouldBe(skus.Select((_, i) => 10m + i).Sum()); + } + + [Fact] + public async Task cache_limit_one_does_not_break_downstream_lookups() + { + // Regression for the second symptom from #4329: with CacheLimitPerTenant=1 on the + // upstream ProductProjection, stage 2's ForEntityIds lookups previously hit the + // database (which can't see in-flight upstream writes) for everything except the + // single cache survivor. JasperFx.Events 1.35.0 defers compaction until the entire + // composite batch finishes, so the upstream cache stays full while downstream stages + // read from it. + StoreOptions(opts => + { + opts.Projections.CompositeProjectionFor("OrderComposite", projection => + { + projection.Add(); // stage 1, cache = 1 + projection.Add(2); // stage 2 + }); + }); + + // Many products, all in the SAME composite batch. With cache=1, only one would + // survive end-of-stage-1 compaction under the old behavior — the rest would miss + // cache, fall through to LoadManyAsync, and find nothing in the DB (uncommitted). + const int productCount = 20; + var productIds = new Guid[productCount]; + for (var i = 0; i < productCount; i++) + { + productIds[i] = Guid.NewGuid(); + theSession.Events.StartStream(productIds[i], + new ProductCreated($"P{i:000}", $"Product {i}", 1m * i)); + } + + var orderId = Guid.NewGuid(); + theSession.Events.StartStream(orderId, new OrderPlacedWithLineItems(productIds)); + + await theSession.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(30.Seconds()); + + var summary = await theSession.LoadAsync(orderId); + summary.ShouldNotBeNull(); + summary.Lines.Count.ShouldBe(productCount); + } +}