Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<PackageVersion Include="FSharp.Core" Version="9.0.100" />
<PackageVersion Include="FSharp.SystemTextJson" Version="1.3.13" />
<PackageVersion Include="JasperFx" Version="1.29.1" />
<PackageVersion Include="JasperFx.Events" Version="1.34.0" />
<PackageVersion Include="JasperFx.Events" Version="1.35.0" />
<PackageVersion Include="JasperFx.Events.SourceGenerator" Version="1.5.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
17 changes: 13 additions & 4 deletions docs/events/projections/composite.md
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,9 @@ Marten provides four supported ways for a downstream stage to consume upstream s
* **`EnrichWith<T>().ForEvent<E>().ForEntityId(...).AddReferences()`** (and the related
`EnrichAsync` overloads). These walk the upstream's in-memory aggregate cache for `T` rather
than the database, so they observe in-flight writes from earlier stages in the same batch.
Use **`ForEntityIds`** <Badge type="tip" text="JasperFx.Events 1.35" /> instead of `ForEntityId`
when a single event references several entities of the same type — see
[Fan-out enrichment](/events/projections/enrichment#fan-out-enrichment-with-forentityids).
* **`group.TryFindUpstreamCache<TId, T>(out var cache)`** <Badge type="tip" text="JasperFx.Events 1.34" /> for custom enrichment
callbacks (notably inside `EnrichUsingEntityQuery`) that need to look up an in-flight upstream
entity by id when it isn't the type of the enclosing `EnrichWith<T>`. Returns `false` when no
Expand All @@ -478,6 +481,13 @@ Direct use of `querySession.Query<T>()` from inside `EnrichEventsAsync` is appro
**static reference data committed in earlier batches** (for example the `RoutingReason` example
above) and not for documents produced by upstream stages of the *current* batch.

::: tip JasperFx.Events 1.35
Per-projection aggregate caches are no longer compacted between stages of a composite — each
stage's cache is kept at full size for the entire composite batch and trimmed as a unit at the
composite boundary. `Options.CacheLimitPerTenant` is therefore a memory tunable again, not a
correctness lever for downstream `EnrichWith<T>` / `TryFindUpstreamCache` lookups.
:::

### Looking up arbitrary upstream entities in EnrichUsingEntityQuery

`EnrichUsingEntityQuery`'s callback receives a cache parameter typed for the enclosing
Expand All @@ -490,10 +500,9 @@ captured `SliceGroup` to reach into the upstream stage's in-memory aggregate cac
<!-- endSnippet -->

`TryFindUpstreamCache` returns `false` when no upstream stage of this composite is registered as
producing entities of that type, and the cache it returns is a hint — `IAggregateCache.TryFind` may
still miss for entities outside the cache window (`Options.CacheLimitPerTenant`), in which case
the caller should fall back to whatever is appropriate for that data (a SQL query for committed
reference data, an `Updated<T>` event already in the slice, etc.).
producing entities of that type. The cache itself is kept at full size for the duration of the
composite batch, so any entity the upstream just produced is reachable by id regardless of
`Options.CacheLimitPerTenant`.

## Things to Know About Composite Projections

Expand Down
46 changes: 46 additions & 0 deletions docs/events/projections/enrichment.md
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,52 @@ public override AppointmentDetails Evolve(AppointmentDetails snapshot, Guid id,
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/DaemonTests/TeleHealth/AppointmentDetailsProjection.cs#L142-L203' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_appointmentdetails_evolve' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

### Fan-out enrichment with ForEntityIds <Badge type="tip" text="JasperFx.Events 1.35" />

`ForEntityId` resolves at most one entity per event. When a single event references *several*
entities of the same type — for example an event payload that carries a list of foreign-key ids,
or a stream-level "this thing changed; here are the related ids" notification — use `ForEntityIds`
(plural) instead. The selector returns an `IEnumerable<TEntityId>`, and `AddReferences()` emits
one `References<TEntity>` synthetic event per resolved id. Missing ids are silently skipped, just
as `IdentityStep` already does for unresolved single ids.

<!-- snippet: sample_for_entity_ids_fan_out -->
<!-- endSnippet -->

In the projection's `Evolve()`, treat the synthetic events the same way you would for the 1-to-1
case — one `case References<TEntity>` arm runs once per resolved entity:

```cs
public override OrderSummary Evolve(OrderSummary snapshot, Guid id, IEvent e)
{
switch (e.Data)
{
case OrderPlacedWithLineItems:
snapshot ??= new OrderSummary { Id = id };
break;

case References<Product> productRef:
snapshot ??= new OrderSummary { Id = id };
snapshot.Lines.Add(new OrderLineSummary
{
ProductId = productRef.Entity.Id,
Sku = productRef.Entity.Sku,
Name = productRef.Entity.Name,
Price = productRef.Entity.Price
});
snapshot.Total += productRef.Entity.Price;
break;
}

return snapshot;
}
```

`ForEntityIds` de-duplicates ids before going to storage so the same id is never loaded twice in
a single enrichment, even when several events in the slice mention it. There is also a
`ForEntityIdsFromEvent` overload that hands the `IEvent<TEvent>` wrapper to the selector for
callers that need access to event metadata (headers, sequence, timestamp).

### Enriching by business keys with EnrichUsingEntityQuery <Badge type="tip" text="8.22" />

The declarative enrichment APIs shown above work very well when events directly reference a document
Expand Down
211 changes: 211 additions & 0 deletions src/DaemonTests/Composites/Bug_4329_fan_out_and_cache_limit.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Events.Grouping;
using Marten;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace DaemonTests.Composites;

// Stage-1 streams represent products.
public record ProductCreated(string Sku, string Name, decimal Price);

public class Product
{
public Guid Id { get; set; }
public string Sku { get; set; } = "";
public string Name { get; set; } = "";
public decimal Price { get; set; }
}

public class ProductProjection: SingleStreamProjection<Product, Guid>
{
public ProductProjection()
{
// INTENTIONALLY tiny — 1.35.0+ keeps this from leaking into downstream correctness;
// see Bug_4329_fan_out_and_cache_limit.cache_limit_one_does_not_break_downstream_lookups.
Options.CacheLimitPerTenant = 1;
}

public override Product Evolve(Product snapshot, Guid id, IEvent e)
{
switch (e.Data)
{
case ProductCreated created:
snapshot = new Product
{
Id = id,
Sku = created.Sku,
Name = created.Name,
Price = created.Price
};
break;
}

return snapshot;
}
}

// Stage-2 stream represents an order. OrderPlacedWithLineItems carries the IDs of the
// products on the order — one event with many product references.
public record OrderPlacedWithLineItems(Guid[] ProductIds);

public class OrderSummary
{
public Guid Id { get; set; }
public List<OrderLineSummary> Lines { get; set; } = [];
public decimal Total { get; set; }
}

public class OrderLineSummary
{
public Guid ProductId { get; set; }
public string Sku { get; set; } = "";
public string Name { get; set; } = "";
public decimal Price { get; set; }
}

public class OrderSummaryProjection: MultiStreamProjection<OrderSummary, Guid>
{
public OrderSummaryProjection()
{
Options.CacheLimitPerTenant = 1000;
Identity<IEvent<OrderPlacedWithLineItems>>(e => e.StreamId);
}

#region sample_for_entity_ids_fan_out

public override async Task EnrichEventsAsync(SliceGroup<OrderSummary, Guid> group,
IQuerySession querySession, CancellationToken cancellation)
{
// OrderPlacedWithLineItems carries an array of ProductIds. ForEntityIds fans out
// a single event to one References<Product> per resolved id, regardless of how
// small the upstream's CacheLimitPerTenant is — JasperFx.Events 1.35.0 keeps
// upstream caches at full size for the duration of the composite batch.
await group
.EnrichWith<Product>()
.ForEvent<OrderPlacedWithLineItems>()
.ForEntityIds(e => e.ProductIds)
.AddReferences();
}

#endregion

public override OrderSummary Evolve(OrderSummary snapshot, Guid id, IEvent e)
{
switch (e.Data)
{
case OrderPlacedWithLineItems:
snapshot ??= new OrderSummary { Id = id };
break;

case References<Product> productRef:
snapshot ??= new OrderSummary { Id = id };
snapshot.Lines.Add(new OrderLineSummary
{
ProductId = productRef.Entity.Id,
Sku = productRef.Entity.Sku,
Name = productRef.Entity.Name,
Price = productRef.Entity.Price
});
snapshot.Total += productRef.Entity.Price;
break;
}

return snapshot;
}
}

public class Bug_4329_fan_out_and_cache_limit: BugIntegrationContext
{
[Fact]
public async Task fan_out_one_event_to_many_upstream_references()
{
StoreOptions(opts =>
{
opts.Projections.CompositeProjectionFor("OrderComposite", projection =>
{
projection.Add<ProductProjection>(); // stage 1
projection.Add<OrderSummaryProjection>(2); // stage 2
});
});

// Place 5 products via individual product streams.
var skus = new[] { "A1", "B2", "C3", "D4", "E5" };
var productIds = new Guid[skus.Length];
for (var i = 0; i < skus.Length; i++)
{
productIds[i] = Guid.NewGuid();
theSession.Events.StartStream<Product>(productIds[i],
new ProductCreated(skus[i], $"Product {skus[i]}", 10m + i));
}

// One order references all 5 products in a single event.
var orderId = Guid.NewGuid();
theSession.Events.StartStream<OrderSummary>(orderId, new OrderPlacedWithLineItems(productIds));

await theSession.SaveChangesAsync();

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();
await daemon.WaitForNonStaleData(30.Seconds());

var summary = await theSession.LoadAsync<OrderSummary>(orderId);
summary.ShouldNotBeNull();
summary.Lines.Count.ShouldBe(5);
summary.Lines.Select(x => x.Sku).OrderBy(x => x).ShouldBe(skus.OrderBy(x => x));
summary.Total.ShouldBe(skus.Select((_, i) => 10m + i).Sum());
}

[Fact]
public async Task cache_limit_one_does_not_break_downstream_lookups()
{
// Regression for the second symptom from #4329: with CacheLimitPerTenant=1 on the
// upstream ProductProjection, stage 2's ForEntityIds lookups previously hit the
// database (which can't see in-flight upstream writes) for everything except the
// single cache survivor. JasperFx.Events 1.35.0 defers compaction until the entire
// composite batch finishes, so the upstream cache stays full while downstream stages
// read from it.
StoreOptions(opts =>
{
opts.Projections.CompositeProjectionFor("OrderComposite", projection =>
{
projection.Add<ProductProjection>(); // stage 1, cache = 1
projection.Add<OrderSummaryProjection>(2); // stage 2
});
});

// Many products, all in the SAME composite batch. With cache=1, only one would
// survive end-of-stage-1 compaction under the old behavior — the rest would miss
// cache, fall through to LoadManyAsync, and find nothing in the DB (uncommitted).
const int productCount = 20;
var productIds = new Guid[productCount];
for (var i = 0; i < productCount; i++)
{
productIds[i] = Guid.NewGuid();
theSession.Events.StartStream<Product>(productIds[i],
new ProductCreated($"P{i:000}", $"Product {i}", 1m * i));
}

var orderId = Guid.NewGuid();
theSession.Events.StartStream<OrderSummary>(orderId, new OrderPlacedWithLineItems(productIds));

await theSession.SaveChangesAsync();

using var daemon = await theStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();
await daemon.WaitForNonStaleData(30.Seconds());

var summary = await theSession.LoadAsync<OrderSummary>(orderId);
summary.ShouldNotBeNull();
summary.Lines.Count.ShouldBe(productCount);
}
}
Loading