From 2fd51cbceed1c894d3c855c408508c638ef6210f Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 6 May 2026 12:07:47 -0500 Subject: [PATCH 1/2] Document cross-stage document visibility in composite projections Adds a "Cross-stage document visibility" section to composite.md explaining that downstream stages cannot SQL-query the in-flight writes of upstream stages in the same composite batch, with pointers to the supported alternatives (Updated, EnrichWith, ReferencePeerView). Cross-links a warning into the EnrichUsingEntityQuery section in enrichment.md so users hit the guidance at the API surface where the trap is easiest to fall into. Refs #4329. Co-Authored-By: Claude Opus 4.7 (1M context) --- docs/events/projections/composite.md | 39 +++++++++++++++++++++++++++ docs/events/projections/enrichment.md | 11 ++++++++ 2 files changed, 50 insertions(+) diff --git a/docs/events/projections/composite.md b/docs/events/projections/composite.md index 90651d8013..99e3e14792 100644 --- a/docs/events/projections/composite.md +++ b/docs/events/projections/composite.md @@ -435,6 +435,45 @@ data for that point of the event sequencing. Moreover, the composite "telehealth" projection is reading the event range *once* for all five constituent projections, and also applying the updates for all five projections at one time to guarantee consistency. +## Cross-stage document visibility + +::: warning +A downstream stage **cannot** see the document writes of an upstream stage by issuing a SQL query +against `IQuerySession` — those writes are still queued in the in-memory projection batch and have +not been committed yet. The query goes to PostgreSQL, which has not received them. +::: + +All stages of a composite projection share a single `IProjectionBatch` that is flushed to the +database **once**, after every stage has run. This is what makes the composite atomic, but it +also means that during the execution of a later stage, the document writes produced by earlier +stages are still queued in memory. A query like + +```cs +// Inside a stage-2 projection's EnrichEventsAsync — DOES NOT see Appointment +// rows written by an upstream stage-1 projection in this same batch +var appointments = await querySession.Query().ToListAsync(); +``` + +will return only what was committed by **previous** batches. During a `RebuildProjectionAsync`, +where every event is replayed from scratch, neither the upstream nor the downstream documents +have been committed yet, so the query returns an empty result for both. + +Marten provides three supported ways for a downstream stage to consume upstream stage output: + +* **`Updated` and `ProjectionDeleted` synthetic events.** When an upstream + `SingleStreamProjection` or `MultiStreamProjection` updates or deletes a document, Marten + injects a synthetic event into the downstream stage's event stream. The current snapshot of `T` + is carried directly on the event payload, so no database lookup is needed. +* **`EnrichWith().ForEvent().ForEntityId(...).AddReferences()`** (and the related + `EnrichAsync` overloads). These walk the upstream's in-memory aggregate cache for `T` rather + than the database, so they observe in-flight writes from earlier stages in the same batch. +* **`group.ReferencePeerView()`** for a parallel projected view that shares the same identity + as the projection being built. + +Direct use of `querySession.Query()` from inside `EnrichEventsAsync` is appropriate for +**static reference data committed in earlier batches** (for example the `RoutingReason` example +above) and not for documents produced by upstream stages of the *current* batch. + ## Things to Know About Composite Projections * Composite projections can include any possible kind of projection including aggregations or event projections or flat table projections diff --git a/docs/events/projections/enrichment.md b/docs/events/projections/enrichment.md index eb3bcfe8b4..134fbb25f1 100644 --- a/docs/events/projections/enrichment.md +++ b/docs/events/projections/enrichment.md @@ -436,6 +436,17 @@ For these scenarios Marten provides `EnrichUsingEntityQuery`. This API gives you how referenced documents are resolved, while still fitting into the declarative enrichment pipeline and avoiding N plus 1 query problems. +::: warning +The `querySession` available to `EnrichUsingEntityQuery` only sees data that is **already committed +to the database**. This API is intended for resolving committed reference data (the `RoutingReason` +example below) — *not* for reading documents produced by an upstream stage of the same +[composite projection](/events/projections/composite#cross-stage-document-visibility) batch. Inside +a composite projection, all stages share a single `IProjectionBatch` that flushes once at the end, +so upstream stage writes are not visible to a SQL query in a downstream stage. To consume upstream +stage output use `Updated` synthetic events, `EnrichWith().AddReferences()`, or +`ReferencePeerView()`. See [Cross-stage document visibility](/events/projections/composite#cross-stage-document-visibility). +::: + Typical use cases include - resolving reference data by a code instead of a document id From 563da019fb4c13dfef93feaae12c111dba79a347 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Wed, 6 May 2026 12:54:21 -0500 Subject: [PATCH 2/2] Add TryFindUpstreamCache pattern: bump JasperFx.Events to 1.34.0, integration test, docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit JasperFx.Events 1.34.0 ships a public SliceGroup.TryFindUpstreamCache that lets a custom enrichment callback look up an upstream stage's in-memory aggregate cache for arbitrary entity types — the previously-only-internal hook that EnrichWith().AddReferences() relies on. This commit: - Bumps JasperFx 1.29.0 → 1.29.1 and JasperFx.Events 1.33.1 → 1.34.0 in Directory.Packages.props. - Adds Bug_4329_try_find_upstream_cache integration test that runs the exact shape from #4329 (single composite batch where stage 1 produces an Order and stage 2 needs to read it). Without this API the only way for stage 2 to read upstream Order data inside a custom EnrichEventsAsync would be to query SQL (which returns empty) or to listen for Updated; with the new API the downstream stage can pull the in-flight Order from the upstream cache by id. - Wires the new option into the "Cross-stage document visibility" section of composite.md and the EnrichUsingEntityQuery warning in enrichment.md, marked with a JasperFx.Events 1.34 badge. The composite.md sample is sourced from #region sample_try_find_upstream_cache in the new test file. Refs #4329, JasperFx/jasperfx#205. Co-Authored-By: Claude Opus 4.7 (1M context) --- Directory.Packages.props | 4 +- docs/events/projections/composite.md | 23 ++- docs/events/projections/enrichment.md | 3 +- .../Bug_4329_try_find_upstream_cache.cs | 154 ++++++++++++++++++ 4 files changed, 180 insertions(+), 4 deletions(-) create mode 100644 src/DaemonTests/Composites/Bug_4329_try_find_upstream_cache.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index f0d7468fa5..9c4054cdbf 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -13,8 +13,8 @@ - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/docs/events/projections/composite.md b/docs/events/projections/composite.md index 99e3e14792..85d787d021 100644 --- a/docs/events/projections/composite.md +++ b/docs/events/projections/composite.md @@ -458,7 +458,7 @@ will return only what was committed by **previous** batches. During a `RebuildPr where every event is replayed from scratch, neither the upstream nor the downstream documents have been committed yet, so the query returns an empty result for both. -Marten provides three supported ways for a downstream stage to consume upstream stage output: +Marten provides four supported ways for a downstream stage to consume upstream stage output: * **`Updated` and `ProjectionDeleted` synthetic events.** When an upstream `SingleStreamProjection` or `MultiStreamProjection` updates or deletes a document, Marten @@ -467,6 +467,10 @@ Marten provides three supported ways for a downstream stage to consume upstream * **`EnrichWith().ForEvent().ForEntityId(...).AddReferences()`** (and the related `EnrichAsync` overloads). These walk the upstream's in-memory aggregate cache for `T` rather than the database, so they observe in-flight writes from earlier stages in the same batch. +* **`group.TryFindUpstreamCache(out var cache)`** for custom enrichment + callbacks (notably inside `EnrichUsingEntityQuery`) that need to look up an in-flight upstream + entity by id when it isn't the type of the enclosing `EnrichWith`. Returns `false` when no + upstream stage of this composite produces entities of that type — see [the example below](#looking-up-arbitrary-upstream-entities-in-enrichusingentityquery). * **`group.ReferencePeerView()`** for a parallel projected view that shares the same identity as the projection being built. @@ -474,6 +478,23 @@ Direct use of `querySession.Query()` from inside `EnrichEventsAsync` is appro **static reference data committed in earlier batches** (for example the `RoutingReason` example above) and not for documents produced by upstream stages of the *current* batch. +### Looking up arbitrary upstream entities in EnrichUsingEntityQuery + +`EnrichUsingEntityQuery`'s callback receives a cache parameter typed for the enclosing +`EnrichWith`. When the callback also needs to read an in-flight upstream entity of a *different* +type — for example a `RoutingReason` enrichment that needs to consult the upstream `Appointment` +that is being projected in the same batch — call `group.TryFindUpstreamCache` against the +captured `SliceGroup` to reach into the upstream stage's in-memory aggregate cache. + + + + +`TryFindUpstreamCache` returns `false` when no upstream stage of this composite is registered as +producing entities of that type, and the cache it returns is a hint — `IAggregateCache.TryFind` may +still miss for entities outside the cache window (`Options.CacheLimitPerTenant`), in which case +the caller should fall back to whatever is appropriate for that data (a SQL query for committed +reference data, an `Updated` event already in the slice, etc.). + ## Things to Know About Composite Projections * Composite projections can include any possible kind of projection including aggregations or event projections or flat table projections diff --git a/docs/events/projections/enrichment.md b/docs/events/projections/enrichment.md index 134fbb25f1..1e75d41a06 100644 --- a/docs/events/projections/enrichment.md +++ b/docs/events/projections/enrichment.md @@ -443,7 +443,8 @@ example below) — *not* for reading documents produced by an upstream stage of [composite projection](/events/projections/composite#cross-stage-document-visibility) batch. Inside a composite projection, all stages share a single `IProjectionBatch` that flushes once at the end, so upstream stage writes are not visible to a SQL query in a downstream stage. To consume upstream -stage output use `Updated` synthetic events, `EnrichWith().AddReferences()`, or +stage output use `Updated` synthetic events, `EnrichWith().AddReferences()`, +`group.TryFindUpstreamCache()` , or `ReferencePeerView()`. See [Cross-stage document visibility](/events/projections/composite#cross-stage-document-visibility). ::: diff --git a/src/DaemonTests/Composites/Bug_4329_try_find_upstream_cache.cs b/src/DaemonTests/Composites/Bug_4329_try_find_upstream_cache.cs new file mode 100644 index 0000000000..045d06f259 --- /dev/null +++ b/src/DaemonTests/Composites/Bug_4329_try_find_upstream_cache.cs @@ -0,0 +1,154 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Grouping; +using Marten; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace DaemonTests.Composites; + +public record OrderPlaced(Guid CustomerId, decimal Total); +public record OrderShipped(string Carrier); + +public class Order +{ + public Guid Id { get; set; } + public Guid CustomerId { get; set; } + public decimal Total { get; set; } + public bool IsShipped { get; set; } +} + +public class OrderShippingNotification +{ + public Guid Id { get; set; } + public Guid CustomerId { get; set; } + public decimal OrderTotal { get; set; } + public string Carrier { get; set; } = ""; +} + +public class OrderProjection: SingleStreamProjection +{ + public OrderProjection() + { + Options.CacheLimitPerTenant = 1000; + } + + public override Order Evolve(Order snapshot, Guid id, IEvent e) + { + switch (e.Data) + { + case OrderPlaced placed: + snapshot = new Order + { + Id = id, + CustomerId = placed.CustomerId, + Total = placed.Total + }; + break; + case OrderShipped: + snapshot.IsShipped = true; + break; + } + + return snapshot; + } +} + +public class OrderShippingNotificationProjection: MultiStreamProjection +{ + public OrderShippingNotificationProjection() + { + Identity>(e => e.StreamId); + } + + #region sample_try_find_upstream_cache + public override Task EnrichEventsAsync(SliceGroup group, + IQuerySession querySession, CancellationToken cancellation) + { + // Ask the upstream OrderProjection (running earlier in the same composite stage) + // for its in-memory aggregate cache. A SQL query for Order in this same batch + // would return nothing — those writes are still queued on the shared + // IProjectionBatch and have not been committed to PostgreSQL yet. + if (!group.TryFindUpstreamCache(out var upstreamOrders)) + { + // No upstream stage in this composite is producing Order documents. + return Task.CompletedTask; + } + + foreach (var slice in group.Slices) + { + if (upstreamOrders.TryFind(slice.Id, out var order)) + { + // Stamp a synthetic References event onto the slice so that + // the Evolve method can read the upstream entity's data. + slice.Reference(order); + } + } + + return Task.CompletedTask; + } + #endregion + + public override OrderShippingNotification Evolve(OrderShippingNotification snapshot, Guid id, IEvent e) + { + switch (e.Data) + { + case OrderShipped shipped: + snapshot ??= new OrderShippingNotification { Id = id }; + snapshot.Carrier = shipped.Carrier; + break; + + case References orderRef: + snapshot ??= new OrderShippingNotification { Id = id }; + snapshot.CustomerId = orderRef.Entity.CustomerId; + snapshot.OrderTotal = orderRef.Entity.Total; + break; + } + + return snapshot; + } +} + +public class Bug_4329_try_find_upstream_cache: BugIntegrationContext +{ + [Fact] + public async Task downstream_stage_can_read_upstream_in_flight_order_via_upstream_cache() + { + StoreOptions(opts => + { + opts.Projections.CompositeProjectionFor("OrderComposite", projection => + { + projection.Add(); // stage 1 + projection.Add(2); // stage 2 + }); + }); + + var orderId = Guid.NewGuid(); + var customerId = Guid.NewGuid(); + + // Both events arrive in a single batch so the Order document is being + // produced and the OrderShipping notification is being computed in the + // same composite execution. This is the regression scenario from #4329. + theSession.Events.StartStream(orderId, + new OrderPlaced(customerId, 99.95m), + new OrderShipped("UPS")); + await theSession.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(30.Seconds()); + + var notification = await theSession.LoadAsync(orderId); + notification.ShouldNotBeNull(); + notification.CustomerId.ShouldBe(customerId); + notification.OrderTotal.ShouldBe(99.95m); + notification.Carrier.ShouldBe("UPS"); + } +}