Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
<PackageVersion Include="FSharp.Core" Version="9.0.100" />
<PackageVersion Include="FSharp.SystemTextJson" Version="1.3.13" />
<PackageVersion Include="JasperFx" Version="1.29.0" />
<PackageVersion Include="JasperFx.Events" Version="1.33.1" />
<PackageVersion Include="JasperFx" Version="1.29.1" />
<PackageVersion Include="JasperFx.Events" Version="1.34.0" />
<PackageVersion Include="JasperFx.Events.SourceGenerator" Version="1.5.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
60 changes: 60 additions & 0 deletions docs/events/projections/composite.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,66 @@ data for that point of the event sequencing.
Moreover, the composite "telehealth" projection is reading the event range *once* for all five constituent projections,
and also applying the updates for all five projections at one time to guarantee consistency.

## Cross-stage document visibility

::: warning
A downstream stage **cannot** see the document writes of an upstream stage by issuing a SQL query
against `IQuerySession` — those writes are still queued in the in-memory projection batch and have
not been committed yet. The query goes to PostgreSQL, which has not received them.
:::

All stages of a composite projection share a single `IProjectionBatch` that is flushed to the
database **once**, after every stage has run. This is what makes the composite atomic, but it
also means that during the execution of a later stage, the document writes produced by earlier
stages are still queued in memory. A query like

```cs
// Inside a stage-2 projection's EnrichEventsAsync — DOES NOT see Appointment
// rows written by an upstream stage-1 projection in this same batch
var appointments = await querySession.Query<Appointment>().ToListAsync();
```

will return only what was committed by **previous** batches. During a `RebuildProjectionAsync`,
where every event is replayed from scratch, neither the upstream nor the downstream documents
have been committed yet, so the query returns an empty result for both.

Marten provides four supported ways for a downstream stage to consume upstream stage output:

* **`Updated<T>` and `ProjectionDeleted<T, TId>` synthetic events.** When an upstream
`SingleStreamProjection<T>` or `MultiStreamProjection<T>` updates or deletes a document, Marten
injects a synthetic event into the downstream stage's event stream. The current snapshot of `T`
is carried directly on the event payload, so no database lookup is needed.
* **`EnrichWith<T>().ForEvent<E>().ForEntityId(...).AddReferences()`** (and the related
`EnrichAsync` overloads). These walk the upstream's in-memory aggregate cache for `T` rather
than the database, so they observe in-flight writes from earlier stages in the same batch.
* **`group.TryFindUpstreamCache<TId, T>(out var cache)`** <Badge type="tip" text="JasperFx.Events 1.34" /> for custom enrichment
callbacks (notably inside `EnrichUsingEntityQuery`) that need to look up an in-flight upstream
entity by id when it isn't the type of the enclosing `EnrichWith<T>`. Returns `false` when no
upstream stage of this composite produces entities of that type — see [the example below](#looking-up-arbitrary-upstream-entities-in-enrichusingentityquery).
* **`group.ReferencePeerView<T>()`** for a parallel projected view that shares the same identity
as the projection being built.

Direct use of `querySession.Query<T>()` from inside `EnrichEventsAsync` is appropriate for
**static reference data committed in earlier batches** (for example the `RoutingReason` example
above) and not for documents produced by upstream stages of the *current* batch.

### Looking up arbitrary upstream entities in EnrichUsingEntityQuery

`EnrichUsingEntityQuery`'s callback receives a cache parameter typed for the enclosing
`EnrichWith<T>`. When the callback also needs to read an in-flight upstream entity of a *different*
type — for example a `RoutingReason` enrichment that needs to consult the upstream `Appointment`
that is being projected in the same batch — call `group.TryFindUpstreamCache<TId, T>` against the
captured `SliceGroup` to reach into the upstream stage's in-memory aggregate cache.

<!-- snippet: sample_try_find_upstream_cache -->
<!-- endSnippet -->

`TryFindUpstreamCache` returns `false` when no upstream stage of this composite is registered as
producing entities of that type, and the cache it returns is a hint — `IAggregateCache.TryFind` may
still miss for entities outside the cache window (`Options.CacheLimitPerTenant`), in which case
the caller should fall back to whatever is appropriate for that data (a SQL query for committed
reference data, an `Updated<T>` event already in the slice, etc.).

## Things to Know About Composite Projections

* Composite projections can include any possible kind of projection including aggregations or event projections or flat table projections
Expand Down
12 changes: 12 additions & 0 deletions docs/events/projections/enrichment.md
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,18 @@ For these scenarios Marten provides `EnrichUsingEntityQuery`. This API gives you
how referenced documents are resolved, while still fitting into the declarative enrichment pipeline
and avoiding N plus 1 query problems.

::: warning
The `querySession` available to `EnrichUsingEntityQuery` only sees data that is **already committed
to the database**. This API is intended for resolving committed reference data (the `RoutingReason`
example below) — *not* for reading documents produced by an upstream stage of the same
[composite projection](/events/projections/composite#cross-stage-document-visibility) batch. Inside
a composite projection, all stages share a single `IProjectionBatch` that flushes once at the end,
so upstream stage writes are not visible to a SQL query in a downstream stage. To consume upstream
stage output use `Updated<T>` synthetic events, `EnrichWith<T>().AddReferences()`,
`group.TryFindUpstreamCache<TId, T>()` <Badge type="tip" text="JasperFx.Events 1.34" />, or
`ReferencePeerView<T>()`. See [Cross-stage document visibility](/events/projections/composite#cross-stage-document-visibility).
:::

Typical use cases include

- resolving reference data by a code instead of a document id
Expand Down
154 changes: 154 additions & 0 deletions src/DaemonTests/Composites/Bug_4329_try_find_upstream_cache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Events.Grouping;
using Marten;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace DaemonTests.Composites;

public record OrderPlaced(Guid CustomerId, decimal Total);
public record OrderShipped(string Carrier);

public class Order
{
public Guid Id { get; set; }
public Guid CustomerId { get; set; }
public decimal Total { get; set; }
public bool IsShipped { get; set; }
}

public class OrderShippingNotification
{
public Guid Id { get; set; }
public Guid CustomerId { get; set; }
public decimal OrderTotal { get; set; }
public string Carrier { get; set; } = "";
}

public class OrderProjection: SingleStreamProjection<Order, Guid>
{
public OrderProjection()
{
Options.CacheLimitPerTenant = 1000;
}

public override Order Evolve(Order snapshot, Guid id, IEvent e)
{
switch (e.Data)
{
case OrderPlaced placed:
snapshot = new Order
{
Id = id,
CustomerId = placed.CustomerId,
Total = placed.Total
};
break;
case OrderShipped:
snapshot.IsShipped = true;
break;
}

return snapshot;
}
}

public class OrderShippingNotificationProjection: MultiStreamProjection<OrderShippingNotification, Guid>
{
public OrderShippingNotificationProjection()
{
Identity<IEvent<OrderShipped>>(e => e.StreamId);
}

#region sample_try_find_upstream_cache
public override Task EnrichEventsAsync(SliceGroup<OrderShippingNotification, Guid> group,
IQuerySession querySession, CancellationToken cancellation)
{
// Ask the upstream OrderProjection (running earlier in the same composite stage)
// for its in-memory aggregate cache. A SQL query for Order in this same batch
// would return nothing — those writes are still queued on the shared
// IProjectionBatch and have not been committed to PostgreSQL yet.
if (!group.TryFindUpstreamCache<Guid, Order>(out var upstreamOrders))
{
// No upstream stage in this composite is producing Order documents.
return Task.CompletedTask;
}

foreach (var slice in group.Slices)
{
if (upstreamOrders.TryFind(slice.Id, out var order))
{
// Stamp a synthetic References<Order> event onto the slice so that
// the Evolve method can read the upstream entity's data.
slice.Reference(order);
}
}

return Task.CompletedTask;
}
#endregion

public override OrderShippingNotification Evolve(OrderShippingNotification snapshot, Guid id, IEvent e)
{
switch (e.Data)
{
case OrderShipped shipped:
snapshot ??= new OrderShippingNotification { Id = id };
snapshot.Carrier = shipped.Carrier;
break;

case References<Order> orderRef:
snapshot ??= new OrderShippingNotification { Id = id };
snapshot.CustomerId = orderRef.Entity.CustomerId;
snapshot.OrderTotal = orderRef.Entity.Total;
break;
}

return snapshot;
}
}

public class Bug_4329_try_find_upstream_cache: BugIntegrationContext
{
[Fact]
public async Task downstream_stage_can_read_upstream_in_flight_order_via_upstream_cache()
{
StoreOptions(opts =>
{
opts.Projections.CompositeProjectionFor("OrderComposite", projection =>
{
projection.Add<OrderProjection>(); // stage 1
projection.Add<OrderShippingNotificationProjection>(2); // stage 2
});
});

var orderId = Guid.NewGuid();
var customerId = Guid.NewGuid();

// Both events arrive in a single batch so the Order document is being
// produced and the OrderShipping notification is being computed in the
// same composite execution. This is the regression scenario from #4329.
theSession.Events.StartStream<Order>(orderId,
new OrderPlaced(customerId, 99.95m),
new OrderShipped("UPS"));
await theSession.SaveChangesAsync();

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();
await daemon.WaitForNonStaleData(30.Seconds());

var notification = await theSession.LoadAsync<OrderShippingNotification>(orderId);
notification.ShouldNotBeNull();
notification.CustomerId.ShouldBe(customerId);
notification.OrderTotal.ShouldBe(99.95m);
notification.Carrier.ShouldBe("UPS");
}
}
Loading