diff --git a/Directory.Packages.props b/Directory.Packages.props index f0d7468fa5..9c4054cdbf 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -13,8 +13,8 @@ - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/docs/events/projections/composite.md b/docs/events/projections/composite.md index 90651d8013..85d787d021 100644 --- a/docs/events/projections/composite.md +++ b/docs/events/projections/composite.md @@ -435,6 +435,66 @@ data for that point of the event sequencing. Moreover, the composite "telehealth" projection is reading the event range *once* for all five constituent projections, and also applying the updates for all five projections at one time to guarantee consistency. +## Cross-stage document visibility + +::: warning +A downstream stage **cannot** see the document writes of an upstream stage by issuing a SQL query +against `IQuerySession` — those writes are still queued in the in-memory projection batch and have +not been committed yet. The query goes to PostgreSQL, which has not received them. +::: + +All stages of a composite projection share a single `IProjectionBatch` that is flushed to the +database **once**, after every stage has run. This is what makes the composite atomic, but it +also means that during the execution of a later stage, the document writes produced by earlier +stages are still queued in memory. A query like + +```cs +// Inside a stage-2 projection's EnrichEventsAsync — DOES NOT see Appointment +// rows written by an upstream stage-1 projection in this same batch +var appointments = await querySession.Query().ToListAsync(); +``` + +will return only what was committed by **previous** batches. During a `RebuildProjectionAsync`, +where every event is replayed from scratch, neither the upstream nor the downstream documents +have been committed yet, so the query returns an empty result for both. + +Marten provides four supported ways for a downstream stage to consume upstream stage output: + +* **`Updated` and `ProjectionDeleted` synthetic events.** When an upstream + `SingleStreamProjection` or `MultiStreamProjection` updates or deletes a document, Marten + injects a synthetic event into the downstream stage's event stream. The current snapshot of `T` + is carried directly on the event payload, so no database lookup is needed. +* **`EnrichWith().ForEvent().ForEntityId(...).AddReferences()`** (and the related + `EnrichAsync` overloads). These walk the upstream's in-memory aggregate cache for `T` rather + than the database, so they observe in-flight writes from earlier stages in the same batch. +* **`group.TryFindUpstreamCache(out var cache)`** for custom enrichment + callbacks (notably inside `EnrichUsingEntityQuery`) that need to look up an in-flight upstream + entity by id when it isn't the type of the enclosing `EnrichWith`. Returns `false` when no + upstream stage of this composite produces entities of that type — see [the example below](#looking-up-arbitrary-upstream-entities-in-enrichusingentityquery). +* **`group.ReferencePeerView()`** for a parallel projected view that shares the same identity + as the projection being built. + +Direct use of `querySession.Query()` from inside `EnrichEventsAsync` is appropriate for +**static reference data committed in earlier batches** (for example the `RoutingReason` example +above) and not for documents produced by upstream stages of the *current* batch. + +### Looking up arbitrary upstream entities in EnrichUsingEntityQuery + +`EnrichUsingEntityQuery`'s callback receives a cache parameter typed for the enclosing +`EnrichWith`. When the callback also needs to read an in-flight upstream entity of a *different* +type — for example a `RoutingReason` enrichment that needs to consult the upstream `Appointment` +that is being projected in the same batch — call `group.TryFindUpstreamCache` against the +captured `SliceGroup` to reach into the upstream stage's in-memory aggregate cache. + + + + +`TryFindUpstreamCache` returns `false` when no upstream stage of this composite is registered as +producing entities of that type, and the cache it returns is a hint — `IAggregateCache.TryFind` may +still miss for entities outside the cache window (`Options.CacheLimitPerTenant`), in which case +the caller should fall back to whatever is appropriate for that data (a SQL query for committed +reference data, an `Updated` event already in the slice, etc.). + ## Things to Know About Composite Projections * Composite projections can include any possible kind of projection including aggregations or event projections or flat table projections diff --git a/docs/events/projections/enrichment.md b/docs/events/projections/enrichment.md index eb3bcfe8b4..1e75d41a06 100644 --- a/docs/events/projections/enrichment.md +++ b/docs/events/projections/enrichment.md @@ -436,6 +436,18 @@ For these scenarios Marten provides `EnrichUsingEntityQuery`. This API gives you how referenced documents are resolved, while still fitting into the declarative enrichment pipeline and avoiding N plus 1 query problems. +::: warning +The `querySession` available to `EnrichUsingEntityQuery` only sees data that is **already committed +to the database**. This API is intended for resolving committed reference data (the `RoutingReason` +example below) — *not* for reading documents produced by an upstream stage of the same +[composite projection](/events/projections/composite#cross-stage-document-visibility) batch. Inside +a composite projection, all stages share a single `IProjectionBatch` that flushes once at the end, +so upstream stage writes are not visible to a SQL query in a downstream stage. To consume upstream +stage output use `Updated` synthetic events, `EnrichWith().AddReferences()`, +`group.TryFindUpstreamCache()` , or +`ReferencePeerView()`. See [Cross-stage document visibility](/events/projections/composite#cross-stage-document-visibility). +::: + Typical use cases include - resolving reference data by a code instead of a document id diff --git a/src/DaemonTests/Composites/Bug_4329_try_find_upstream_cache.cs b/src/DaemonTests/Composites/Bug_4329_try_find_upstream_cache.cs new file mode 100644 index 0000000000..045d06f259 --- /dev/null +++ b/src/DaemonTests/Composites/Bug_4329_try_find_upstream_cache.cs @@ -0,0 +1,154 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Grouping; +using Marten; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace DaemonTests.Composites; + +public record OrderPlaced(Guid CustomerId, decimal Total); +public record OrderShipped(string Carrier); + +public class Order +{ + public Guid Id { get; set; } + public Guid CustomerId { get; set; } + public decimal Total { get; set; } + public bool IsShipped { get; set; } +} + +public class OrderShippingNotification +{ + public Guid Id { get; set; } + public Guid CustomerId { get; set; } + public decimal OrderTotal { get; set; } + public string Carrier { get; set; } = ""; +} + +public class OrderProjection: SingleStreamProjection +{ + public OrderProjection() + { + Options.CacheLimitPerTenant = 1000; + } + + public override Order Evolve(Order snapshot, Guid id, IEvent e) + { + switch (e.Data) + { + case OrderPlaced placed: + snapshot = new Order + { + Id = id, + CustomerId = placed.CustomerId, + Total = placed.Total + }; + break; + case OrderShipped: + snapshot.IsShipped = true; + break; + } + + return snapshot; + } +} + +public class OrderShippingNotificationProjection: MultiStreamProjection +{ + public OrderShippingNotificationProjection() + { + Identity>(e => e.StreamId); + } + + #region sample_try_find_upstream_cache + public override Task EnrichEventsAsync(SliceGroup group, + IQuerySession querySession, CancellationToken cancellation) + { + // Ask the upstream OrderProjection (running earlier in the same composite stage) + // for its in-memory aggregate cache. A SQL query for Order in this same batch + // would return nothing — those writes are still queued on the shared + // IProjectionBatch and have not been committed to PostgreSQL yet. + if (!group.TryFindUpstreamCache(out var upstreamOrders)) + { + // No upstream stage in this composite is producing Order documents. + return Task.CompletedTask; + } + + foreach (var slice in group.Slices) + { + if (upstreamOrders.TryFind(slice.Id, out var order)) + { + // Stamp a synthetic References event onto the slice so that + // the Evolve method can read the upstream entity's data. + slice.Reference(order); + } + } + + return Task.CompletedTask; + } + #endregion + + public override OrderShippingNotification Evolve(OrderShippingNotification snapshot, Guid id, IEvent e) + { + switch (e.Data) + { + case OrderShipped shipped: + snapshot ??= new OrderShippingNotification { Id = id }; + snapshot.Carrier = shipped.Carrier; + break; + + case References orderRef: + snapshot ??= new OrderShippingNotification { Id = id }; + snapshot.CustomerId = orderRef.Entity.CustomerId; + snapshot.OrderTotal = orderRef.Entity.Total; + break; + } + + return snapshot; + } +} + +public class Bug_4329_try_find_upstream_cache: BugIntegrationContext +{ + [Fact] + public async Task downstream_stage_can_read_upstream_in_flight_order_via_upstream_cache() + { + StoreOptions(opts => + { + opts.Projections.CompositeProjectionFor("OrderComposite", projection => + { + projection.Add(); // stage 1 + projection.Add(2); // stage 2 + }); + }); + + var orderId = Guid.NewGuid(); + var customerId = Guid.NewGuid(); + + // Both events arrive in a single batch so the Order document is being + // produced and the OrderShipping notification is being computed in the + // same composite execution. This is the regression scenario from #4329. + theSession.Events.StartStream(orderId, + new OrderPlaced(customerId, 99.95m), + new OrderShipped("UPS")); + await theSession.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(30.Seconds()); + + var notification = await theSession.LoadAsync(orderId); + notification.ShouldNotBeNull(); + notification.CustomerId.ShouldBe(customerId); + notification.OrderTotal.ShouldBe(99.95m); + notification.Carrier.ShouldBe("UPS"); + } +}