Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions src/EventSourcingTests/Aggregation/ancillary_store_enrichment_tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Events;
using JasperFx.Events.Grouping;
using JasperFx.Events.Projections;
using Marten;
using Marten.Testing.Harness;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Aggregation;

// Marker interface for the ancillary store that holds reference products
public interface IProductStore : IDocumentStore;

public class ancillary_store_enrichment_tests : IAsyncLifetime
{
private IHost _host = null!;
private IDocumentStore _primaryStore = null!;

public async Task InitializeAsync()
{
_host = await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddMarten(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "ancillary_enrich_primary";
opts.Projections.Add<OrderProjection>(ProjectionLifecycle.Async);
})
.ApplyAllDatabaseChangesOnStartup();

services.AddMartenStore<IProductStore>(opts =>
{
opts.Connection(ConnectionSource.ConnectionString);
opts.DatabaseSchemaName = "ancillary_enrich_products";
})
.ApplyAllDatabaseChangesOnStartup();
})
.StartAsync();

_primaryStore = _host.Services.GetRequiredService<IDocumentStore>();
}

public async Task DisposeAsync()
{
await _host.StopAsync();
_host.Dispose();
}

[Fact]
public async Task enrichment_from_ancillary_store_resolves_entity_and_maps_to_projection()
{
var productStore = _host.Services.GetRequiredService<IProductStore>();

// Seed a product in the ancillary store
var productId = Guid.NewGuid();
await using (var session = productStore.LightweightSession())
{
session.Store(new Product { Id = productId, Name = "Widget Pro" });
await session.SaveChangesAsync();
}

// Append an event in the primary store that references the product
var streamId = Guid.NewGuid();
await using (var session = _primaryStore.LightweightSession())
{
session.Events.StartStream<Order>(streamId, new OrderPlaced { ProductId = productId });
await session.SaveChangesAsync();
}

using var daemon = await _primaryStore.BuildProjectionDaemonAsync();
await daemon.StartAllAsync();
await daemon.WaitForNonStaleData(15.Seconds());
await daemon.StopAllAsync();

await using var query = _primaryStore.QuerySession();
var order = await query.LoadAsync<Order>(streamId);

order.ShouldNotBeNull();
order.ProductName.ShouldBe("Widget Pro");
}
}

// ── domain model ──────────────────────────────────────────────────────────────

public class Product
{
public Guid Id { get; set; }
public string Name { get; set; } = string.Empty;
}

public class Order
{
public Guid Id { get; set; }
public Guid ProductId { get; set; }
public string ProductName { get; set; } = string.Empty;
}

public record OrderPlaced
{
public Guid ProductId { get; init; }
public Product? Product { get; set; }
}

// ── projection ────────────────────────────────────────────────────────────────

// No constructor injection needed — the store is resolved from DI at enrichment time
public class OrderProjection : SingleStreamProjection<Order, Guid>
{
public override async Task EnrichEventsAsync(
SliceGroup<Order, Guid> group,
IQuerySession querySession,
CancellationToken cancellation)
{
await group.EnrichWith<Product>()
.UsingStore<IProductStore>()
.ForEvent<OrderPlaced>()
.ForEntityId(e => e.ProductId)
.EnrichAsync((slice, e, product) =>
{
e.Data.Product = product;
});
}

public Order Apply(IEvent<OrderPlaced> e, Order order)
{
order.ProductId = e.Data.ProductId;
order.ProductName = e.Data.Product?.Name ?? string.Empty;
return order;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using System;
using JasperFx.Events;
using JasperFx.Events.Grouping;

namespace Marten.Events.Aggregation;

public static class AncillaryStoreEnrichmentExtensions
{
/// <summary>
/// Switch the enrichment source to an ancillary Marten store supplied as a
/// <see cref="Lazy{T}"/>. The store is resolved only when enrichment executes
/// so projection construction does not deadlock DI. A lightweight session is
/// opened per enrichment call and disposed automatically when enrichment completes.
/// </summary>
/// <example>
/// await group.EnrichWith&lt;Tarief&gt;()
/// .UsingStore(_tarievenStore)
/// .ForEvent&lt;InvoiceCreated&gt;()
/// .ForEntityId(e => e.TariefId)
/// .Apply((slice, e, tarief) => e.Data.TariefNaam = tarief.Naam);
/// </example>
public static SliceGroup<TDoc, TId>.EntityStep<TEntity> UsingStore<TEntity, TStore, TDoc, TId>(
this SliceGroup<TDoc, TId>.EntityStep<TEntity> step,
Lazy<TStore> ancillaryStore)
where TStore : IDocumentStore
where TDoc : notnull
where TId : notnull
{
var session = (IStorageOperations)ancillaryStore.Value.LightweightSession();
return step.WithAlternateSession(session);
}
}
1 change: 1 addition & 0 deletions src/Marten/Internal/SecondaryStoreConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public StoreOptions BuildStoreOptions(IServiceProvider provider)
options.StoreName = typeof(T).Name;
options.ReadJasperFxOptions(provider.GetService<JasperFxOptions>());
options.Projections.AttachServiceProvider(provider);
options.Services = provider;

return options;
}
Expand Down
3 changes: 3 additions & 0 deletions src/Marten/Internal/Sessions/DocumentSessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Linq;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using JasperFx.Events;
using Marten.Events;
using Marten.Exceptions;
using Marten.Internal.Operations;
Expand All @@ -19,6 +20,8 @@ public abstract partial class DocumentSessionBase: QuerySession, IDocumentSessio
internal readonly ISessionWorkTracker _workTracker;
private readonly List<ITransactionParticipant> _transactionParticipants = new();

IServiceProvider? IStorageOperations.Services => Options.Services;

private Dictionary<string, NestedTenantSession>? _byTenant;

internal DocumentSessionBase(
Expand Down
2 changes: 2 additions & 0 deletions src/Marten/MartenServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ Func<IServiceProvider, StoreOptions> optionSource

options.InitialData.AddRange(s.GetServices<IInitialData>());
options.Projections.AttachServiceProvider(s);
options.Services = s;

return options;
});
Expand Down Expand Up @@ -315,6 +316,7 @@ public static MartenStoreExpression<T> AddMartenStore<T>(this IServiceCollection

services.AddSingleton<T>(s => config.Build(s));
services.AddSingleton<Lazy<T>>(s => new Lazy<T>(() => s.GetRequiredService<T>()));
services.AddSingleton<Func<T, IStorageOperations>>(_ => store => (IStorageOperations)store.LightweightSession());

// Default keyed session factory for the ancillary store
services.AddKeyedSingleton<ISessionFactory>(typeof(T), (sp, _) =>
Expand Down
3 changes: 3 additions & 0 deletions src/Marten/StoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,9 @@ public ITenancy Tenancy

public int CommandTimeout { get; set; } = DefaultTimeout;

// Service provider from DI — set during store construction so sessions can resolve ancillary stores
internal IServiceProvider? Services { get; set; }

// This is used to move logging into the >v7 async daemon
internal ILoggerFactory? LogFactory { get; set; }

Expand Down
Loading