diff --git a/src/DaemonTests/Composites/Feature_4284_composite_projection_with_services.cs b/src/DaemonTests/Composites/Feature_4284_composite_projection_with_services.cs new file mode 100644 index 0000000000..00868e0be5 --- /dev/null +++ b/src/DaemonTests/Composites/Feature_4284_composite_projection_with_services.cs @@ -0,0 +1,147 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; +using JasperFx.Events; +using Marten; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Marten.Testing.Harness; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Xunit; + +namespace DaemonTests.Composites; + +public class Feature_4284_composite_projection_with_services +{ + [Fact] + public async Task can_use_scoped_services_in_projection_registered_under_composite() + { + using var host = await Host.CreateDefaultBuilder() + .ConfigureServices(services => + { + services.AddScoped(); + + services.AddMarten(opts => + { + opts.Connection(ConnectionSource.ConnectionString); + opts.DatabaseSchemaName = "feature_4284_net" + Environment.Version.Major; + opts.Projections.CompositeProjectionFor("Feature4284Products", projection => + { + projection.AddProjectionWithServices(ServiceLifetime.Scoped); + projection.AddProjectionWithServices(ServiceLifetime.Scoped); + }); + }) + .ApplyAllDatabaseChangesOnStartup(); + }) + .StartAsync(); + + var store = host.Services.GetRequiredService(); + await store.Advanced.Clean.CompletelyRemoveAllAsync(); + + await using var session = store.LightweightSession(); + var streamId = session.Events.StartStream( + new CompositeProductRegistered("Ankle Socks", "Socks")).Id; + await session.SaveChangesAsync(); + + using var daemon = await store.BuildProjectionDaemonAsync(); + await daemon.StartAllAsync(); + await daemon.WaitForNonStaleData(10.Seconds()); + await daemon.StopAllAsync(); + + var product = await session.LoadAsync(streamId); + product.ShouldNotBeNull(); + product.Name.ShouldBe("Ankle Socks"); + product.Category.ShouldBe("Socks"); + product.Price.ShouldBe(12.5); + + var metric = await session.LoadAsync(streamId); + metric.ShouldNotBeNull(); + metric.Price.ShouldBe(12.5); + } +} + +public interface ICompositePriceLookup +{ + double PriceFor(string category); +} + +public class CompositePriceLookup: ICompositePriceLookup +{ + public double PriceFor(string category) + { + return category == "Socks" ? 12.5 : 5; + } +} + +public class CompositeProduct +{ + public Guid Id { get; set; } + public double Price { get; set; } + public string Category { get; set; } + public string Name { get; set; } +} + +public record CompositeProductRegistered(string Name, string Category); + +public class CompositeProductMetric +{ + public Guid Id { get; set; } + public double Price { get; set; } +} + +public class CompositeProductProjection: SingleStreamProjection +{ + private readonly ICompositePriceLookup _lookup; + + public CompositeProductProjection(ICompositePriceLookup lookup) + { + _lookup = lookup; + Name = "Feature4284Product"; + } + + public override CompositeProduct Evolve(CompositeProduct snapshot, Guid id, IEvent e) + { + snapshot ??= new CompositeProduct { Id = id }; + + if (e.Data is CompositeProductRegistered registered) + { + snapshot.Price = _lookup.PriceFor(registered.Category); + snapshot.Name = registered.Name; + snapshot.Category = registered.Category; + } + + return snapshot; + } +} + +public class CompositeProductMetricProjection: IProjection +{ + private readonly ICompositePriceLookup _lookup; + + public CompositeProductMetricProjection(ICompositePriceLookup lookup) + { + _lookup = lookup; + } + + public Task ApplyAsync(IDocumentOperations operations, IReadOnlyList events, + CancellationToken cancellation) + { + foreach (var e in events) + { + if (e.Data is CompositeProductRegistered registered) + { + operations.Store(new CompositeProductMetric + { + Id = e.StreamId, + Price = _lookup.PriceFor(registered.Category) + }); + } + } + + return Task.CompletedTask; + } +} diff --git a/src/Marten/Events/Projections/CompositeProjection.cs b/src/Marten/Events/Projections/CompositeProjection.cs index a6326a2355..1e8d4f35db 100644 --- a/src/Marten/Events/Projections/CompositeProjection.cs +++ b/src/Marten/Events/Projections/CompositeProjection.cs @@ -11,22 +11,40 @@ using JasperFx.Events.Descriptors; using JasperFx.Events.Projections; using JasperFx.Events.Projections.Composite; +using JasperFx.Events.Projections.ContainerScoped; using JasperFx.Events.Subscriptions; using Marten.Events.Aggregation; using Marten.Schema; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; namespace Marten.Events.Projections; +internal interface ICompositeProjectionServiceSource +{ + void AttachServiceProvider(IServiceProvider services); +} + public class CompositeProjection : CompositeProjection { private readonly StoreOptions _options; + private IServiceProvider? _services; internal CompositeProjection(string name, StoreOptions options, ProjectionOptions parent) : base(name) { _options = options; } + internal void AttachServiceProvider(IServiceProvider services) + { + _services = services; + + foreach (var source in AllProjections().OfType()) + { + source.AttachServiceProvider(services); + } + } + /// /// Add a snapshot (self-aggregating) projection to this composite. /// @@ -89,6 +107,403 @@ public void Add(IProjection projection, int stageNumber = 1) { Add(new T(), stageNumber); } + + /// + /// Add a projection that requires services from the application's IoC container to this composite. + /// + /// + /// The IoC lifecycle for the projection instance. Note that the Transient lifetime will still be + /// treated as Scoped. + /// + /// Optionally move the execution to a later stage. The default is 1 + /// Optional configuration of the projection name, version, event filtering, and async execution + /// The projection type to add + public void AddProjectionWithServices(ServiceLifetime lifetime = ServiceLifetime.Scoped, + int stageNumber = 1, Action? configure = null) where TProjection : class, IMartenRegistrable + { + var source = new CompositeProjectionWithServicesSource(lifetime, configure); + + if (_services != null) + { + source.AttachServiceProvider(_services); + } + + StageFor(stageNumber).Add(source); + } +} + +internal class CompositeProjectionWithServicesSource : + ProjectionBase, + IProjectionSource, + ISubscriptionFactory, + IAggregateProjection, + IMartenAggregateProjection, + ICompositeProjectionServiceSource where TProjection : class, IMartenRegistrable +{ + private readonly Type? _aggregateType; + private readonly Action? _configure; + private readonly Type? _identityType; + private readonly ServiceLifetime _lifetime; + private readonly AggregationScope _scope; + private IProjectionSource? _inner; + private IServiceProvider? _services; + + public CompositeProjectionWithServicesSource(ServiceLifetime lifetime, Action? configure) + { + _lifetime = lifetime; + _configure = configure; + Lifecycle = ProjectionLifecycle.Async; + Name = typeof(TProjection).Name; + Version = 1; + + if (typeof(TProjection).TryGetAttribute(out var att)) + { + Version = att.Version; + } + + _configure?.Invoke(this); + + if (tryFindAggregateTypes(typeof(TProjection), out var aggregateType, out var identityType, out var scope)) + { + _aggregateType = aggregateType; + _identityType = identityType; + _scope = scope; + RegisterPublishedType(aggregateType); + } + } + + public SubscriptionType Type => Source.Type; + public ShardName[] ShardNames() => Source.ShardNames(); + public Type ImplementationType => Source.ImplementationType; + public SubscriptionDescriptor Describe(IEventStore store) => Source.Describe(store); + + private IProjectionSource Source + { + get + { + if (_inner != null) + { + return _inner; + } + + if (_services == null) + { + throw new InvalidOperationException( + $"Projection {typeof(TProjection).FullNameInCode()} requires application services, but no IServiceProvider has been attached. Use this registration through AddMarten() so Marten can resolve the projection from DI."); + } + + _inner = buildSource(_services); + return _inner; + } + } + + public void AttachServiceProvider(IServiceProvider services) + { + _services = services; + _inner ??= buildSource(services); + } + + private IProjectionSource buildSource(IServiceProvider services) + { + IProjectionSource source; + + if (_lifetime == ServiceLifetime.Singleton) + { + source = buildSingletonSource(services); + } + else if (typeof(TProjection).CanBeCastTo() && + !typeof(TProjection).CanBeCastTo>()) + { + source = typeof(CompositeScopedIProjectionSource<>) + .CloseAndBuildAs(services, typeof(TProjection)) + .As>(); + } + else if (_aggregateType != null && _identityType != null) + { + var projectionServices = new ProjectionActivatingServiceProvider(services); + source = typeof(ScopedAggregationWrapper<,,,,>) + .CloseAndBuildAs(projectionServices, typeof(TProjection), _aggregateType, _identityType, + typeof(IDocumentOperations), typeof(IQuerySession)) + .As>(); + } + else + { + var projectionServices = new ProjectionActivatingServiceProvider(services); + source = typeof(ScopedProjectionWrapper<,,>) + .CloseAndBuildAs(projectionServices, typeof(TProjection), typeof(IDocumentOperations), + typeof(IQuerySession)) + .As>(); + } + + if (source is ProjectionBase projection) + { + projection.Lifecycle = ProjectionLifecycle.Async; + _configure?.Invoke(projection); + projection.Name = Name; + projection.OverwriteVersion(Version); + } + + return source; + } + + private static IProjectionSource buildSingletonSource(IServiceProvider services) + { + var projection = services.GetService() ?? + ActivatorUtilities.CreateInstance(services); + + if (projection is IProjection martenProjection && + projection is not IProjectionSource) + { + return new CompositeIProjectionSource(martenProjection); + } + + return projection.As>(); + } + + private static bool tryFindAggregateTypes(Type projectionType, out Type aggregateType, out Type identityType, + out AggregationScope scope) + { + var type = projectionType; + while (type != null && type != typeof(object)) + { + if (type.IsGenericType) + { + var definition = type.GetGenericTypeDefinition(); + if (definition == typeof(SingleStreamProjection<,>) || + definition == typeof(MultiStreamProjection<,>)) + { + var arguments = type.GetGenericArguments(); + aggregateType = arguments[0]; + identityType = arguments[1]; + scope = definition == typeof(SingleStreamProjection<,>) + ? AggregationScope.SingleStream + : AggregationScope.MultiStream; + return true; + } + } + + type = type.BaseType; + } + + aggregateType = null!; + identityType = null!; + scope = default; + return false; + } + + Type IAggregateProjection.IdentityType => _identityType ?? typeof(void); + Type IAggregateProjection.AggregateType => _aggregateType ?? typeof(void); + AggregationScope IAggregateProjection.Scope => _scope; + Type[] IAggregateProjection.AllEventTypes => + _aggregateType == null ? [] : Source.As().AllEventTypes; + NaturalKeyDefinition? IAggregateProjection.NaturalKeyDefinition => + _aggregateType == null ? null : Source.As().NaturalKeyDefinition; + + void IMartenAggregateProjection.ConfigureAggregateMapping(DocumentMapping mapping, StoreOptions storeOptions) + { + if (_services != null) + { + using var scope = _services.CreateScope(); + var projection = scope.ServiceProvider.GetService() ?? + ActivatorUtilities.CreateInstance(scope.ServiceProvider); + + if (projection is IMartenAggregateProjection martenAggregateProjection) + { + martenAggregateProjection.ConfigureAggregateMapping(mapping, storeOptions); + return; + } + } + + if (_scope == AggregationScope.SingleStream) + { + mapping.UseVersionFromMatchingStream = true; + } + } + + public IReadOnlyList> Shards() => Source.Shards(); + + public bool TryBuildReplayExecutor(IEventStore store, IEventDatabase database, + [NotNullWhen(true)] out IReplayExecutor? executor) + { + return Source.TryBuildReplayExecutor(store, database, out executor); + } + + public IInlineProjection BuildForInline() + { + throw new NotSupportedException("Composite projections do not support inline execution"); + } + + public ISubscriptionExecution BuildExecution(IEventStore store, + IEventDatabase database, ILoggerFactory loggerFactory, ShardName shardName) + { + return Source.As>() + .BuildExecution(store, database, loggerFactory, shardName); + } + + public ISubscriptionExecution BuildExecution(IEventStore store, + IEventDatabase database, ILogger logger, ShardName shardName) + { + return Source.As>() + .BuildExecution(store, database, logger, shardName); + } +} + +internal class CompositeScopedIProjectionSource : + ProjectionBase, + IProjectionSource, + ISubscriptionFactory where TProjection : class, IProjection +{ + private readonly IServiceProvider _services; + + public CompositeScopedIProjectionSource(IServiceProvider services) + { + _services = services; + Lifecycle = ProjectionLifecycle.Async; + Name = typeof(TProjection).Name; + Version = 1; + if (typeof(TProjection).TryGetAttribute(out var att)) + { + Version = att.Version; + } + } + + public SubscriptionType Type => SubscriptionType.EventProjection; + public ShardName[] ShardNames() => [new ShardName(Name, ShardName.All, Version)]; + public Type ImplementationType => typeof(TProjection); + public SubscriptionDescriptor Describe(IEventStore store) => new(this, store); + + public IReadOnlyList> Shards() + { + return + [ + new AsyncShard(Options, ShardRole.Projection, + new ShardName(Name, "All", Version), this, this) + ]; + } + + public bool TryBuildReplayExecutor(IEventStore store, IEventDatabase database, + [NotNullWhen(true)] out IReplayExecutor? executor) + { + executor = default; + return false; + } + + public IInlineProjection BuildForInline() + { + throw new NotSupportedException("CompositeScopedIProjectionSource does not support inline execution"); + } + + public ISubscriptionExecution BuildExecution(IEventStore store, + IEventDatabase database, ILoggerFactory loggerFactory, ShardName shardName) + { + return new CompositeScopedIProjectionExecution(_services, shardName); + } + + public ISubscriptionExecution BuildExecution(IEventStore store, + IEventDatabase database, ILogger logger, ShardName shardName) + { + return new CompositeScopedIProjectionExecution(_services, shardName); + } +} + +internal class CompositeScopedIProjectionExecution : ISubscriptionExecution where TProjection : class, IProjection +{ + private readonly IServiceProvider _services; + + public CompositeScopedIProjectionExecution(IServiceProvider services, ShardName shardName) + { + _services = services; + ShardName = shardName; + } + + public ShardName ShardName { get; } + public ShardExecutionMode Mode { get; set; } + + public async Task ProcessRangeAsync(EventRange range) + { + var batch = range.ActiveBatch as IProjectionBatch; + if (batch == null) return; + + var groups = range.Events.GroupBy(x => x.TenantId).ToArray(); + foreach (var group in groups) + { + using var scope = _services.CreateScope(); + var projection = ActivatorUtilities.GetServiceOrCreateInstance(scope.ServiceProvider); + + await using var session = batch.SessionForTenant(group.Key); + await projection.ApplyAsync(session, group.ToList(), CancellationToken.None).ConfigureAwait(false); + } + } + + public ValueTask EnqueueAsync(EventPage page, ISubscriptionAgent subscriptionAgent) => new(); + public Task StopAndDrainAsync(CancellationToken token) => Task.CompletedTask; + public Task HardStopAsync() => Task.CompletedTask; + + public bool TryBuildReplayExecutor([NotNullWhen(true)] out IReplayExecutor? executor) + { + executor = default; + return false; + } + + public Task ProcessImmediatelyAsync(SubscriptionAgent subscriptionAgent, EventPage events, + CancellationToken cancellation) => Task.CompletedTask; + + public bool TryGetAggregateCache([NotNullWhen(true)] out IAggregateCaching? caching) + { + caching = null; + return false; + } + + public ValueTask DisposeAsync() => new(); +} + +internal class ProjectionActivatingServiceProvider : IServiceProvider, IServiceScopeFactory + where TProjection : class +{ + private readonly IServiceProvider _inner; + + public ProjectionActivatingServiceProvider(IServiceProvider inner) + { + _inner = inner; + } + + public object? GetService(Type serviceType) + { + if (serviceType == typeof(IServiceScopeFactory)) + { + return this; + } + + if (serviceType == typeof(TProjection)) + { + return _inner.GetService(serviceType) ?? ActivatorUtilities.CreateInstance(_inner, serviceType); + } + + return _inner.GetService(serviceType); + } + + public IServiceScope CreateScope() + { + return new ProjectionActivatingServiceScope(_inner.CreateScope()); + } +} + +internal class ProjectionActivatingServiceScope : IServiceScope where TProjection : class +{ + private readonly IServiceScope _inner; + + public ProjectionActivatingServiceScope(IServiceScope inner) + { + _inner = inner; + ServiceProvider = new ProjectionActivatingServiceProvider(inner.ServiceProvider); + } + + public IServiceProvider ServiceProvider { get; } + + public void Dispose() + { + _inner.Dispose(); + } } /// diff --git a/src/Marten/Events/Projections/ProjectionOptions.cs b/src/Marten/Events/Projections/ProjectionOptions.cs index c93574c8c4..7de72fa207 100644 --- a/src/Marten/Events/Projections/ProjectionOptions.cs +++ b/src/Marten/Events/Projections/ProjectionOptions.cs @@ -71,6 +71,14 @@ internal IEnumerable allPlanners() foreach (var planner in _builtInPlanners) yield return planner; } + internal void AttachServiceProvider(IServiceProvider services) + { + foreach (var composite in All.OfType()) + { + composite.AttachServiceProvider(services); + } + } + internal IInlineProjection[] BuildInlineProjections(DocumentStore store) { var projections = All diff --git a/src/Marten/Internal/SecondaryStoreConfig.cs b/src/Marten/Internal/SecondaryStoreConfig.cs index b78f1bbe5b..ad9afba1f0 100644 --- a/src/Marten/Internal/SecondaryStoreConfig.cs +++ b/src/Marten/Internal/SecondaryStoreConfig.cs @@ -111,6 +111,7 @@ public StoreOptions BuildStoreOptions(IServiceProvider provider) options.ReadJasperFxOptions(provider.GetService()); options.StoreName = typeof(T).Name; options.ReadJasperFxOptions(provider.GetService()); + options.Projections.AttachServiceProvider(provider); return options; } diff --git a/src/Marten/MartenServiceCollectionExtensions.cs b/src/Marten/MartenServiceCollectionExtensions.cs index c4041dc6e5..96a550dbc5 100644 --- a/src/Marten/MartenServiceCollectionExtensions.cs +++ b/src/Marten/MartenServiceCollectionExtensions.cs @@ -178,6 +178,7 @@ Func optionSource options.ReadJasperFxOptions(s.GetService()); options.InitialData.AddRange(s.GetServices()); + options.Projections.AttachServiceProvider(s); return options; });