diff --git a/src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs b/src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs new file mode 100644 index 0000000000..cfa80c963f --- /dev/null +++ b/src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs @@ -0,0 +1,137 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Grouping; +using JasperFx.Events.Projections; +using Marten; +using Marten.Testing.Harness; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Xunit; + +namespace EventSourcingTests.Aggregation; + +// Marker interface for the ancillary store that holds reference products +public interface IProductStore : IDocumentStore; + +public class ancillary_store_enrichment_tests : IAsyncLifetime +{ + private IHost _host = null!; + private IDocumentStore _primaryStore = null!; + + public async Task InitializeAsync() + { + _host = await Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services.AddMarten(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "ancillary_enrich_primary"; + opts.Projections.Add(ProjectionLifecycle.Async); + }) + .ApplyAllDatabaseChangesOnStartup(); + + services.AddMartenStore(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "ancillary_enrich_products"; + }) + .ApplyAllDatabaseChangesOnStartup(); + }) + .StartAsync(); + + _primaryStore = _host.Services.GetRequiredService(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task enrichment_from_ancillary_store_resolves_entity_and_maps_to_projection() + { + var productStore = _host.Services.GetRequiredService(); + + // Seed a product in the ancillary store + var productId = Guid.NewGuid(); + await using (var session = productStore.LightweightSession()) + { + session.Store(new Product { Id = productId, Name = "Widget Pro" }); + await session.SaveChangesAsync(); + } + + // Append an event in the primary store that references the product + var streamId = Guid.NewGuid(); + await using (var session = _primaryStore.LightweightSession()) + { + session.Events.StartStream(streamId, new OrderPlaced { ProductId = productId }); + await session.SaveChangesAsync(); + } + + using var daemon = await _primaryStore.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(15.Seconds()); + await daemon.StopAllAsync(); + + await using var query = _primaryStore.QuerySession(); + var order = await query.LoadAsync(streamId); + + order.ShouldNotBeNull(); + order.ProductName.ShouldBe("Widget Pro"); + } +} + +// ── domain model ────────────────────────────────────────────────────────────── + +public class Product +{ + public Guid Id { get; set; } + public string Name { get; set; } = string.Empty; +} + +public class Order +{ + public Guid Id { get; set; } + public Guid ProductId { get; set; } + public string ProductName { get; set; } = string.Empty; +} + +public record OrderPlaced +{ + public Guid ProductId { get; init; } + public Product? Product { get; set; } +} + +// ── projection ──────────────────────────────────────────────────────────────── + +// No constructor injection needed — the store is resolved from DI at enrichment time +public class OrderProjection : SingleStreamProjection +{ + public override async Task EnrichEventsAsync( + SliceGroup group, + IQuerySession querySession, + CancellationToken cancellation) + { + await group.EnrichWith() + .UsingStore() + .ForEvent() + .ForEntityId(e => e.ProductId) + .EnrichAsync((slice, e, product) => + { + e.Data.Product = product; + }); + } + + public Order Apply(IEvent e, Order order) + { + order.ProductId = e.Data.ProductId; + order.ProductName = e.Data.Product?.Name ?? string.Empty; + return order; + } +} diff --git a/src/Marten/Events/Aggregation/AncillaryStoreEnrichmentExtensions.cs b/src/Marten/Events/Aggregation/AncillaryStoreEnrichmentExtensions.cs new file mode 100644 index 0000000000..f4b6b752b0 --- /dev/null +++ b/src/Marten/Events/Aggregation/AncillaryStoreEnrichmentExtensions.cs @@ -0,0 +1,32 @@ +using System; +using JasperFx.Events; +using JasperFx.Events.Grouping; + +namespace Marten.Events.Aggregation; + +public static class AncillaryStoreEnrichmentExtensions +{ + /// + /// Switch the enrichment source to an ancillary Marten store supplied as a + /// . The store is resolved only when enrichment executes + /// so projection construction does not deadlock DI. A lightweight session is + /// opened per enrichment call and disposed automatically when enrichment completes. + /// + /// + /// await group.EnrichWith<Tarief>() + /// .UsingStore(_tarievenStore) + /// .ForEvent<InvoiceCreated>() + /// .ForEntityId(e => e.TariefId) + /// .Apply((slice, e, tarief) => e.Data.TariefNaam = tarief.Naam); + /// + public static SliceGroup.EntityStep UsingStore( + this SliceGroup.EntityStep step, + Lazy ancillaryStore) + where TStore : IDocumentStore + where TDoc : notnull + where TId : notnull + { + var session = (IStorageOperations)ancillaryStore.Value.LightweightSession(); + return step.WithAlternateSession(session); + } +} diff --git a/src/Marten/Internal/SecondaryStoreConfig.cs b/src/Marten/Internal/SecondaryStoreConfig.cs index ad9afba1f0..89cab229eb 100644 --- a/src/Marten/Internal/SecondaryStoreConfig.cs +++ b/src/Marten/Internal/SecondaryStoreConfig.cs @@ -112,6 +112,7 @@ public StoreOptions BuildStoreOptions(IServiceProvider provider) options.StoreName = typeof(T).Name; options.ReadJasperFxOptions(provider.GetService()); options.Projections.AttachServiceProvider(provider); + options.Services = provider; return options; } diff --git a/src/Marten/Internal/Sessions/DocumentSessionBase.cs b/src/Marten/Internal/Sessions/DocumentSessionBase.cs index a3d5c9dc72..8a9e334361 100644 --- a/src/Marten/Internal/Sessions/DocumentSessionBase.cs +++ b/src/Marten/Internal/Sessions/DocumentSessionBase.cs @@ -4,6 +4,7 @@ using System.Linq; using JasperFx.Core; using JasperFx.Core.Reflection; +using JasperFx.Events; using Marten.Events; using Marten.Exceptions; using Marten.Internal.Operations; @@ -19,6 +20,8 @@ public abstract partial class DocumentSessionBase: QuerySession, IDocumentSessio internal readonly ISessionWorkTracker _workTracker; private readonly List _transactionParticipants = new(); + IServiceProvider? IStorageOperations.Services => Options.Services; + private Dictionary? _byTenant; internal DocumentSessionBase( diff --git a/src/Marten/MartenServiceCollectionExtensions.cs b/src/Marten/MartenServiceCollectionExtensions.cs index 96a550dbc5..8388ba4bac 100644 --- a/src/Marten/MartenServiceCollectionExtensions.cs +++ b/src/Marten/MartenServiceCollectionExtensions.cs @@ -179,6 +179,7 @@ Func optionSource options.InitialData.AddRange(s.GetServices()); options.Projections.AttachServiceProvider(s); + options.Services = s; return options; }); @@ -315,6 +316,7 @@ public static MartenStoreExpression AddMartenStore(this IServiceCollection services.AddSingleton(s => config.Build(s)); services.AddSingleton>(s => new Lazy(() => s.GetRequiredService())); + services.AddSingleton>(_ => store => (IStorageOperations)store.LightweightSession()); // Default keyed session factory for the ancillary store services.AddKeyedSingleton(typeof(T), (sp, _) => diff --git a/src/Marten/StoreOptions.cs b/src/Marten/StoreOptions.cs index eeb6e9890e..83416604c7 100644 --- a/src/Marten/StoreOptions.cs +++ b/src/Marten/StoreOptions.cs @@ -474,6 +474,9 @@ public ITenancy Tenancy public int CommandTimeout { get; set; } = DefaultTimeout; + // Service provider from DI — set during store construction so sessions can resolve ancillary stores + internal IServiceProvider? Services { get; set; } + // This is used to move logging into the >v7 async daemon internal ILoggerFactory? LogFactory { get; set; }