From 508dfc0344accefc1cdb45564d6f6c6e57b08ef2 Mon Sep 17 00:00:00 2001 From: Anne Erdtsieck Date: Tue, 28 Apr 2026 14:20:34 +0200 Subject: [PATCH 1/2] Add UsingStore ancillary store enrichment for Lazy (#4300) Depends on JasperFx.Events WithAlternateSession support (jasperfx/jasperfx#). - AncillaryStoreEnrichmentExtensions.UsingStore() extends the EnrichWith() chain to redirect entity lookups to a Lazy ancillary store; the session is opened lazily and disposed automatically after enrichment completes - Tenant-aware: FetchProjectionStorageAsync handles tenant routing internally - ancillary_store_enrichment_tests: end-to-end integration test verifying that a product from an ancillary store is resolved and mapped into an Order projection via the async daemon Closes https://github.com/JasperFx/marten/issues/4300 --- .../ancillary_store_enrichment_tests.cs | 145 ++++++++++++++++++ .../AncillaryStoreEnrichmentExtensions.cs | 31 ++++ 2 files changed, 176 insertions(+) create mode 100644 src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs create mode 100644 src/Marten/Events/Aggregation/AncillaryStoreEnrichmentExtensions.cs diff --git a/src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs b/src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs new file mode 100644 index 0000000000..b96b2db2f0 --- /dev/null +++ b/src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs @@ -0,0 +1,145 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Grouping; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events.Aggregation; +using Marten.Testing.Harness; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Aggregation; + +// Marker interface for the ancillary store that holds reference products +public interface IProductStore : IDocumentStore; + +public class ancillary_store_enrichment_tests : IAsyncLifetime +{ + private IHost _host = null!; + private IDocumentStore _primaryStore = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services.AddMarten(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "ancillary_enrich_primary"; + }) + .AddProjectionWithServices(ProjectionLifecycle.Async, ServiceLifetime.Singleton) + .ApplyAllDatabaseChangesOnStartup(); + + services.AddMartenStore(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "ancillary_enrich_products"; + }) + .ApplyAllDatabaseChangesOnStartup(); + }) + .StartAsync(); + + _primaryStore = _host.Services.GetRequiredService(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task enrichment_from_ancillary_store_resolves_entity_and_maps_to_projection() + { + var productStore = _host.Services.GetRequiredService(); + + // Seed a product in the ancillary store + var productId = Guid.NewGuid(); + await using (var session = productStore.LightweightSession()) + { + session.Store(new Product { Id = productId, Name = "Widget Pro" }); + await session.SaveChangesAsync(); + } + + // Append an event in the primary store that references the product + var streamId = Guid.NewGuid(); + await using (var session = _primaryStore.LightweightSession()) + { + session.Events.StartStream(streamId, new OrderPlaced { ProductId = productId }); + await session.SaveChangesAsync(); + } + + using var daemon = await _primaryStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(15.Seconds()); + await daemon.StopAllAsync(); + + await using var query = _primaryStore.QuerySession(); + var order = await query.LoadAsync(streamId); + + order.ShouldNotBeNull(); + order.ProductName.ShouldBe("Widget Pro"); + } +} + +// ── domain model ────────────────────────────────────────────────────────────── + +public class Product +{ + public Guid Id { get; set; } + public string Name { get; set; } = string.Empty; +} + +public class Order +{ + public Guid Id { get; set; } + public Guid ProductId { get; set; } + public string ProductName { get; set; } = string.Empty; +} + +public record OrderPlaced +{ + public Guid ProductId { get; init; } + public Product? Product { get; set; } +} + +// ── projection ──────────────────────────────────────────────────────────────── + +public class OrderProjection : SingleStreamProjection +{ + private readonly Lazy _productStore; + + public OrderProjection(Lazy productStore) + { + _productStore = productStore; + } + + public override async Task EnrichEventsAsync( + SliceGroup group, + IQuerySession querySession, + CancellationToken cancellation) + { + await group.EnrichWith() + .UsingStore(_productStore) + .ForEvent() + .ForEntityId(e => e.ProductId) + .EnrichAsync((slice, e, product) => + { + e.Data.Product = product; + }); + } + + public Order Apply(IEvent e, Order order) + { + order.ProductId = e.Data.ProductId; + order.ProductName = e.Data.Product?.Name ?? string.Empty; + return order; + } +} diff --git a/src/Marten/Events/Aggregation/AncillaryStoreEnrichmentExtensions.cs b/src/Marten/Events/Aggregation/AncillaryStoreEnrichmentExtensions.cs new file mode 100644 index 0000000000..e80e3649f8 --- /dev/null +++ b/src/Marten/Events/Aggregation/AncillaryStoreEnrichmentExtensions.cs @@ -0,0 +1,31 @@ +using System; +using JasperFx.Events; +using JasperFx.Events.Grouping; + +namespace Marten.Events.Aggregation; + +public static class AncillaryStoreEnrichmentExtensions +{ + /// + /// Switch the enrichment source to an ancillary Marten store. The store is resolved + /// lazily so projection construction does not deadlock DI. A lightweight session is + /// opened per enrichment call and disposed automatically when enrichment completes. + /// + /// + /// await group.EnrichWith<Tarief>() + /// .UsingStore(_tarievenStore) + /// .ForEvent<InvoiceCreated>() + /// .ForEntityId(e => e.TariefId) + /// .Apply((slice, e, tarief) => e.Data.TariefNaam = tarief.Naam); + /// + public static SliceGroup.EntityStep UsingStore( + this SliceGroup.EntityStep step, + Lazy ancillaryStore) + where TStore : IDocumentStore + where TDoc : notnull + where TId : notnull + { + var session = (IStorageOperations)ancillaryStore.Value.LightweightSession(); + return step.WithAlternateSession(session); + } +} From 65f5af32307cf232909358f4ce8e5bd445a46f8b Mon Sep 17 00:00:00 2001 From: Anne Erdtsieck Date: Tue, 28 Apr 2026 15:08:37 +0200 Subject: [PATCH 2/2] Wire IServiceProvider into sessions for UsingStore() enrichment (fixes #4300) - Store IServiceProvider on StoreOptions (set in AddMarten and AddMartenStore DI builders) and expose it via IStorageOperations.Services on DocumentSessionBase - Register Func in AddMartenStore() so JasperFx's UsingStore() can open a lightweight session without a Marten dependency - Remove AncillaryStore phantom-struct types; keep only the Lazy overload - Update test to use the clean .UsingStore() syntax --- .../ancillary_store_enrichment_tests.cs | 14 +++----------- .../AncillaryStoreEnrichmentExtensions.cs | 5 +++-- src/Marten/Internal/SecondaryStoreConfig.cs | 1 + .../Internal/Sessions/DocumentSessionBase.cs | 3 +++ src/Marten/MartenServiceCollectionExtensions.cs | 2 ++ src/Marten/StoreOptions.cs | 3 +++ 6 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs b/src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs index b96b2db2f0..cfa80c963f 100644 --- a/src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs +++ b/src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs @@ -1,5 +1,4 @@ using System; -using System.Linq; using System.Threading; using System.Threading.Tasks; using JasperFx.Core; @@ -7,7 +6,6 @@ using JasperFx.Events.Grouping; using JasperFx.Events.Projections; using Marten; -using Marten.Events.Aggregation; using Marten.Testing.Harness; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -33,8 +31,8 @@ public async Task InitializeAsync() { opts.Connection(ConnectionSource.ConnectionString); opts.DatabaseSchemaName = "ancillary_enrich_primary"; + opts.Projections.Add(ProjectionLifecycle.Async); }) - .AddProjectionWithServices(ProjectionLifecycle.Async, ServiceLifetime.Singleton) .ApplyAllDatabaseChangesOnStartup(); services.AddMartenStore(opts => @@ -112,22 +110,16 @@ public record OrderPlaced // ── projection ──────────────────────────────────────────────────────────────── +// No constructor injection needed — the store is resolved from DI at enrichment time public class OrderProjection : SingleStreamProjection { - private readonly Lazy _productStore; - - public OrderProjection(Lazy productStore) - { - _productStore = productStore; - } - public override async Task EnrichEventsAsync( SliceGroup group, IQuerySession querySession, CancellationToken cancellation) { await group.EnrichWith() - .UsingStore(_productStore) + .UsingStore() .ForEvent() .ForEntityId(e => e.ProductId) .EnrichAsync((slice, e, product) => diff --git a/src/Marten/Events/Aggregation/AncillaryStoreEnrichmentExtensions.cs b/src/Marten/Events/Aggregation/AncillaryStoreEnrichmentExtensions.cs index e80e3649f8..f4b6b752b0 100644 --- a/src/Marten/Events/Aggregation/AncillaryStoreEnrichmentExtensions.cs +++ b/src/Marten/Events/Aggregation/AncillaryStoreEnrichmentExtensions.cs @@ -7,8 +7,9 @@ namespace Marten.Events.Aggregation; public static class AncillaryStoreEnrichmentExtensions { /// - /// Switch the enrichment source to an ancillary Marten store. The store is resolved - /// lazily so projection construction does not deadlock DI. A lightweight session is + /// Switch the enrichment source to an ancillary Marten store supplied as a + /// . The store is resolved only when enrichment executes + /// so projection construction does not deadlock DI. A lightweight session is /// opened per enrichment call and disposed automatically when enrichment completes. /// /// diff --git a/src/Marten/Internal/SecondaryStoreConfig.cs b/src/Marten/Internal/SecondaryStoreConfig.cs index ad9afba1f0..89cab229eb 100644 --- a/src/Marten/Internal/SecondaryStoreConfig.cs +++ b/src/Marten/Internal/SecondaryStoreConfig.cs @@ -112,6 +112,7 @@ public StoreOptions BuildStoreOptions(IServiceProvider provider) options.StoreName = typeof(T).Name; options.ReadJasperFxOptions(provider.GetService()); options.Projections.AttachServiceProvider(provider); + options.Services = provider; return options; } diff --git a/src/Marten/Internal/Sessions/DocumentSessionBase.cs b/src/Marten/Internal/Sessions/DocumentSessionBase.cs index a3d5c9dc72..8a9e334361 100644 --- a/src/Marten/Internal/Sessions/DocumentSessionBase.cs +++ b/src/Marten/Internal/Sessions/DocumentSessionBase.cs @@ -4,6 +4,7 @@ using System.Linq; using JasperFx.Core; using JasperFx.Core.Reflection; +using JasperFx.Events; using Marten.Events; using Marten.Exceptions; using Marten.Internal.Operations; @@ -19,6 +20,8 @@ public abstract partial class DocumentSessionBase: QuerySession, IDocumentSessio internal readonly ISessionWorkTracker _workTracker; private readonly List _transactionParticipants = new(); + IServiceProvider? IStorageOperations.Services => Options.Services; + private Dictionary? _byTenant; internal DocumentSessionBase( diff --git a/src/Marten/MartenServiceCollectionExtensions.cs b/src/Marten/MartenServiceCollectionExtensions.cs index 96a550dbc5..8388ba4bac 100644 --- a/src/Marten/MartenServiceCollectionExtensions.cs +++ b/src/Marten/MartenServiceCollectionExtensions.cs @@ -179,6 +179,7 @@ Func optionSource options.InitialData.AddRange(s.GetServices()); options.Projections.AttachServiceProvider(s); + options.Services = s; return options; }); @@ -315,6 +316,7 @@ public static MartenStoreExpression AddMartenStore(this IServiceCollection services.AddSingleton(s => config.Build(s)); services.AddSingleton>(s => new Lazy(() => s.GetRequiredService())); + services.AddSingleton>(_ => store => (IStorageOperations)store.LightweightSession()); // Default keyed session factory for the ancillary store services.AddKeyedSingleton(typeof(T), (sp, _) => diff --git a/src/Marten/StoreOptions.cs b/src/Marten/StoreOptions.cs index eeb6e9890e..83416604c7 100644 --- a/src/Marten/StoreOptions.cs +++ b/src/Marten/StoreOptions.cs @@ -474,6 +474,9 @@ public ITenancy Tenancy public int CommandTimeout { get; set; } = DefaultTimeout; + // Service provider from DI — set during store construction so sessions can resolve ancillary stores + internal IServiceProvider? Services { get; set; } + // This is used to move logging into the >v7 async daemon internal ILoggerFactory? LogFactory { get; set; }