diff --git a/src/Marten/Events/Aggregation/AggregationRuntime.cs b/src/Marten/Events/Aggregation/AggregationRuntime.cs index eda502d9db..036690e9f8 100644 --- a/src/Marten/Events/Aggregation/AggregationRuntime.cs +++ b/src/Marten/Events/Aggregation/AggregationRuntime.cs @@ -84,6 +84,13 @@ public AggregationRuntime(IDocumentStore store, IAggregateProjection projection, public IEventSlicer Slicer { get; } public IDocumentStorage Storage { get; } + private bool shouldProcessSideEffects(DocumentSessionBase session, ProjectionLifecycle lifecycle) + { + if (session is ProjectionDocumentSession { Mode: ShardExecutionMode.Continuous }) return true; + + return lifecycle == ProjectionLifecycle.Inline && session.Options.Events.EnableSideEffectsOnInlineProjections; + } + public async ValueTask ApplyChangesAsync(DocumentSessionBase session, EventSlice slice, CancellationToken cancellation, ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline) @@ -94,7 +101,7 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session, { var operation = Storage.DeleteForId(slice.Id, slice.Tenant.TenantId); - if (session is ProjectionDocumentSession { Mode: ShardExecutionMode.Continuous }) + if (shouldProcessSideEffects(session, lifecycle)) { await processPossibleSideEffects(session, slice).ConfigureAwait(false); } @@ -148,7 +155,7 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session, maybeArchiveStream(session, slice); - if (session is ProjectionDocumentSession { Mode: ShardExecutionMode.Continuous }) + if (shouldProcessSideEffects(session, lifecycle)) { // Need to set the aggregate in case it didn't exist upfront slice.Aggregate = aggregate; diff --git a/src/Marten/Events/Aggregation/Rebuilds/AggregatePageHandler.cs b/src/Marten/Events/Aggregation/Rebuilds/AggregatePageHandler.cs index ad9c367fff..ce989f74d6 100644 --- a/src/Marten/Events/Aggregation/Rebuilds/AggregatePageHandler.cs +++ b/src/Marten/Events/Aggregation/Rebuilds/AggregatePageHandler.cs @@ -153,6 +153,8 @@ public async Task ProcessPageAsync(IDaemonRuntime runtime, ShardName shardName, ShouldApplyListeners = false }; + await batch.InitializeMessageBatch().ConfigureAwait(false); + // Gotta use the current tenant if using conjoined tenancy var sessionOptions = SessionOptions.ForDatabase(_session.TenantId, _session.Database); diff --git a/src/Marten/Events/Aggregation/SideEffectCode.cs b/src/Marten/Events/Aggregation/SideEffectCode.cs index 2d5e46e89d..edd2b085d2 100644 --- a/src/Marten/Events/Aggregation/SideEffectCode.cs +++ b/src/Marten/Events/Aggregation/SideEffectCode.cs @@ -10,7 +10,7 @@ public interface IMessageSink ValueTask PublishAsync(T message); } -public interface ITenantedMessageSink +public interface ITenantedMessageSink : IMessageSink { ValueTask PublishAsync(T message, string tenantId); } diff --git a/src/Marten/Events/Daemon/Internals/GroupedProjectionExecution.cs b/src/Marten/Events/Daemon/Internals/GroupedProjectionExecution.cs index 8d909cfa21..1db9267213 100644 --- a/src/Marten/Events/Daemon/Internals/GroupedProjectionExecution.cs +++ b/src/Marten/Events/Daemon/Internals/GroupedProjectionExecution.cs @@ -220,6 +220,8 @@ private async Task buildBatchAsync(EventRangeGroup group, ShouldApplyListeners = group.Agent.Mode == ShardExecutionMode.Continuous && group.Range.Events.Any() }; + await batch.InitializeMessageBatch().ConfigureAwait(false); + // Mark the progression batch.Queue.Post(group.Range.BuildProgressionOperation(_store.Events)); diff --git a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs index 246542dd05..03623c5ac3 100644 --- a/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs +++ b/src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs @@ -25,7 +25,7 @@ public class ProjectionUpdateBatch: IUpdateBatch, IAsyncDisposable, IDisposable, private readonly DaemonSettings _settings; private readonly CancellationToken _token; private OperationPage? _current; - private DocumentSessionBase? _session; + private DocumentSessionBase _session; private IMartenSession Session { @@ -41,6 +41,7 @@ private IMartenSession Session internal ProjectionUpdateBatch(DaemonSettings settings, DocumentSessionBase? session, ShardExecutionMode mode, CancellationToken token) { + _settings = settings; _session = session ?? throw new ArgumentNullException(nameof(session)); _token = token; @@ -54,6 +55,12 @@ internal ProjectionUpdateBatch(DaemonSettings settings, startNewPage(session); } + public async Task InitializeMessageBatch() + { + _batch = await _session!.Options.Events.MessageOutbox.CreateBatch(_session).ConfigureAwait(false); + Listeners.Add(_batch); + } + public async Task WaitForCompletion() { Queue.Complete(); @@ -335,25 +342,8 @@ protected void Dispose(bool disposing) } private IMessageBatch? _batch; - private readonly SemaphoreSlim _semaphore = new(1, 1); - public async ValueTask CurrentMessageBatch(DocumentSessionBase session) + public ValueTask CurrentMessageBatch(DocumentSessionBase session) { - if (_batch != null) return _batch; - - await _semaphore.WaitAsync(_token).ConfigureAwait(false); - - if (_batch != null) return _batch; - - try - { - _batch = await _session.Options.Events.MessageOutbox.CreateBatch(session).ConfigureAwait(false); - Listeners.Add(_batch); - - return _batch; - } - finally - { - _semaphore.Release(); - } + return new ValueTask(_batch); } } diff --git a/src/Marten/Events/EventGraph.cs b/src/Marten/Events/EventGraph.cs index 7b395b283b..033e49a68b 100644 --- a/src/Marten/Events/EventGraph.cs +++ b/src/Marten/Events/EventGraph.cs @@ -115,6 +115,12 @@ internal EventGraph(StoreOptions options) /// public bool EnableUniqueIndexOnEventId { get; set; } = false; + /// + /// Opt into having Marten process "side effects" on aggregation projections (SingleStreamProjection/MultiStreamProjection) while + /// running in an Inline lifecycle. Default is false; + /// + public bool EnableSideEffectsOnInlineProjections { get; set; } = false; + /// /// Configure whether event streams are identified with Guid or strings /// diff --git a/src/Marten/Events/IEventStoreOptions.cs b/src/Marten/Events/IEventStoreOptions.cs index 07440546c3..4a0ba1aef6 100644 --- a/src/Marten/Events/IEventStoreOptions.cs +++ b/src/Marten/Events/IEventStoreOptions.cs @@ -106,6 +106,12 @@ bool UseIdentityMapForInlineAggregates /// public bool UseMandatoryStreamTypeDeclaration { get; set; } + /// + /// Opt into having Marten process "side effects" on aggregation projections (SingleStreamProjection/MultiStreamProjection) while + /// running in an Inline lifecycle. Default is false; + /// + bool EnableSideEffectsOnInlineProjections { get; set; } + /// /// Register an event type with Marten. This isn't strictly necessary for normal usage, /// but can help Marten with asynchronous projections where Marten hasn't yet encountered diff --git a/src/Marten/Events/IReadOnlyEventStoreOptions.cs b/src/Marten/Events/IReadOnlyEventStoreOptions.cs index 291f95cfe4..c5cd280883 100644 --- a/src/Marten/Events/IReadOnlyEventStoreOptions.cs +++ b/src/Marten/Events/IReadOnlyEventStoreOptions.cs @@ -83,4 +83,10 @@ public interface IReadOnlyEventStoreOptions /// but this will be true in 8.0 /// bool UseMandatoryStreamTypeDeclaration { get; set; } + + /// + /// Opt into having Marten process "side effects" on aggregation projections (SingleStreamProjection/MultiStreamProjection) while + /// running in an Inline lifecycle. Default is false; + /// + public bool EnableSideEffectsOnInlineProjections { get; set; } } diff --git a/src/Marten/Events/Projections/ProjectionOptions.cs b/src/Marten/Events/Projections/ProjectionOptions.cs index e594496248..cc879aa7eb 100644 --- a/src/Marten/Events/Projections/ProjectionOptions.cs +++ b/src/Marten/Events/Projections/ProjectionOptions.cs @@ -356,11 +356,6 @@ public void Add( ) where TProjection : GeneratedProjection, new() { - if (lifecycle == ProjectionLifecycle.Live) - { - throw new InvalidOperationException("The generic overload of Add does not support Live projections, please use the non-generic overload."); - } - var projection = new TProjection { Lifecycle = lifecycle }; asyncConfiguration?.Invoke(projection.Options); diff --git a/src/Marten/Internal/Sessions/DocumentSessionBase.SaveChanges.cs b/src/Marten/Internal/Sessions/DocumentSessionBase.SaveChanges.cs index 5c0e981082..a525e80087 100644 --- a/src/Marten/Internal/Sessions/DocumentSessionBase.SaveChanges.cs +++ b/src/Marten/Internal/Sessions/DocumentSessionBase.SaveChanges.cs @@ -104,12 +104,17 @@ public async Task SaveChangesAsync(CancellationToken token = default) await ExecuteBatchAsync(batch, token).ConfigureAwait(false); + if (_messageBatch != null) + { + await _messageBatch.AfterCommitAsync(this, _workTracker, token).ConfigureAwait(false); + } + resetDirtyChecking(); EjectPatchedTypes(_workTracker); Logger.RecordSavedChanges(this, _workTracker); - foreach (var listener in Listeners) + foreach (var listener in Listeners.ToArray()) { await listener.AfterCommitAsync(this, _workTracker, token).ConfigureAwait(false); } diff --git a/src/Marten/Internal/Sessions/DocumentSessionBase.cs b/src/Marten/Internal/Sessions/DocumentSessionBase.cs index fdbad417b7..85e1ebc7b7 100644 --- a/src/Marten/Internal/Sessions/DocumentSessionBase.cs +++ b/src/Marten/Internal/Sessions/DocumentSessionBase.cs @@ -53,7 +53,7 @@ internal ValueTask CurrentMessageBatch() return batch.CurrentMessageBatch(this); } - throw new InvalidOperationException("This session is not a ProjectionDocumentSession"); + return StartMessageBatch(); } internal ITenancy Tenancy => DocumentStore.As().Tenancy; @@ -470,4 +470,11 @@ internal bool TryGetAggregateFromIdentityMap(TId id, out TDoc documen document = default; return false; } + + private IMessageBatch? _messageBatch; + internal async ValueTask StartMessageBatch() + { + _messageBatch ??= await Options.Events.MessageOutbox.CreateBatch(this).ConfigureAwait(false); + return _messageBatch; + } } diff --git a/src/Marten/Subscriptions/SubscriptionExecution.cs b/src/Marten/Subscriptions/SubscriptionExecution.cs index 7a05250689..1bbed2a347 100644 --- a/src/Marten/Subscriptions/SubscriptionExecution.cs +++ b/src/Marten/Subscriptions/SubscriptionExecution.cs @@ -52,7 +52,9 @@ private async Task executeRange(EventRange range) var batch = new ProjectionUpdateBatch(_store.Options.Projections, parent, Mode, _cancellation.Token) { ShouldApplyListeners = Mode == ShardExecutionMode.Continuous && range.Events.Any() - };; + }; + + await batch.InitializeMessageBatch().ConfigureAwait(false); // Mark the progression batch.Queue.Post(range.BuildProgressionOperation(_store.Events));