Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/Marten/Events/Aggregation/AggregationRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ public AggregationRuntime(IDocumentStore store, IAggregateProjection projection,
public IEventSlicer<TDoc, TId> Slicer { get; }
public IDocumentStorage<TDoc, TId> Storage { get; }

private bool shouldProcessSideEffects(DocumentSessionBase session, ProjectionLifecycle lifecycle)
{
if (session is ProjectionDocumentSession { Mode: ShardExecutionMode.Continuous }) return true;

return lifecycle == ProjectionLifecycle.Inline && session.Options.Events.EnableSideEffectsOnInlineProjections;
}

public async ValueTask ApplyChangesAsync(DocumentSessionBase session,
EventSlice<TDoc, TId> slice, CancellationToken cancellation,
ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline)
Expand All @@ -94,7 +101,7 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session,
{
var operation = Storage.DeleteForId(slice.Id, slice.Tenant.TenantId);

if (session is ProjectionDocumentSession { Mode: ShardExecutionMode.Continuous })
if (shouldProcessSideEffects(session, lifecycle))
{
await processPossibleSideEffects(session, slice).ConfigureAwait(false);
}
Expand Down Expand Up @@ -148,7 +155,7 @@ public async ValueTask ApplyChangesAsync(DocumentSessionBase session,

maybeArchiveStream(session, slice);

if (session is ProjectionDocumentSession { Mode: ShardExecutionMode.Continuous })
if (shouldProcessSideEffects(session, lifecycle))
{
// Need to set the aggregate in case it didn't exist upfront
slice.Aggregate = aggregate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ public async Task ProcessPageAsync(IDaemonRuntime runtime, ShardName shardName,
ShouldApplyListeners = false
};

await batch.InitializeMessageBatch().ConfigureAwait(false);

// Gotta use the current tenant if using conjoined tenancy
var sessionOptions = SessionOptions.ForDatabase(_session.TenantId, _session.Database);

Expand Down
2 changes: 1 addition & 1 deletion src/Marten/Events/Aggregation/SideEffectCode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface IMessageSink
ValueTask PublishAsync<T>(T message);
}

public interface ITenantedMessageSink
public interface ITenantedMessageSink : IMessageSink
{
ValueTask PublishAsync<T>(T message, string tenantId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ private async Task<ProjectionUpdateBatch> buildBatchAsync(EventRangeGroup group,
ShouldApplyListeners = group.Agent.Mode == ShardExecutionMode.Continuous && group.Range.Events.Any()
};

await batch.InitializeMessageBatch().ConfigureAwait(false);

// Mark the progression
batch.Queue.Post(group.Range.BuildProgressionOperation(_store.Events));

Expand Down
30 changes: 10 additions & 20 deletions src/Marten/Events/Daemon/Internals/ProjectionUpdateBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class ProjectionUpdateBatch: IUpdateBatch, IAsyncDisposable, IDisposable,
private readonly DaemonSettings _settings;
private readonly CancellationToken _token;
private OperationPage? _current;
private DocumentSessionBase? _session;
private DocumentSessionBase _session;

private IMartenSession Session
{
Expand All @@ -41,6 +41,7 @@ private IMartenSession Session
internal ProjectionUpdateBatch(DaemonSettings settings,
DocumentSessionBase? session, ShardExecutionMode mode, CancellationToken token)
{

_settings = settings;
_session = session ?? throw new ArgumentNullException(nameof(session));
_token = token;
Expand All @@ -54,6 +55,12 @@ internal ProjectionUpdateBatch(DaemonSettings settings,
startNewPage(session);
}

public async Task InitializeMessageBatch()
{
_batch = await _session!.Options.Events.MessageOutbox.CreateBatch(_session).ConfigureAwait(false);
Listeners.Add(_batch);
}

public async Task WaitForCompletion()
{
Queue.Complete();
Expand Down Expand Up @@ -335,25 +342,8 @@ protected void Dispose(bool disposing)
}

private IMessageBatch? _batch;
private readonly SemaphoreSlim _semaphore = new(1, 1);
public async ValueTask<IMessageBatch> CurrentMessageBatch(DocumentSessionBase session)
public ValueTask<IMessageBatch> CurrentMessageBatch(DocumentSessionBase session)
{
if (_batch != null) return _batch;

await _semaphore.WaitAsync(_token).ConfigureAwait(false);

if (_batch != null) return _batch;

try
{
_batch = await _session.Options.Events.MessageOutbox.CreateBatch(session).ConfigureAwait(false);
Listeners.Add(_batch);

return _batch;
}
finally
{
_semaphore.Release();
}
return new ValueTask<IMessageBatch>(_batch);
}
}
6 changes: 6 additions & 0 deletions src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ internal EventGraph(StoreOptions options)
/// </summary>
public bool EnableUniqueIndexOnEventId { get; set; } = false;

/// <summary>
/// Opt into having Marten process "side effects" on aggregation projections (SingleStreamProjection/MultiStreamProjection) while
/// running in an Inline lifecycle. Default is false;
/// </summary>
public bool EnableSideEffectsOnInlineProjections { get; set; } = false;

/// <summary>
/// Configure whether event streams are identified with Guid or strings
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/Marten/Events/IEventStoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ bool UseIdentityMapForInlineAggregates
/// </summary>
public bool UseMandatoryStreamTypeDeclaration { get; set; }

/// <summary>
/// Opt into having Marten process "side effects" on aggregation projections (SingleStreamProjection/MultiStreamProjection) while
/// running in an Inline lifecycle. Default is false;
/// </summary>
bool EnableSideEffectsOnInlineProjections { get; set; }

/// <summary>
/// Register an event type with Marten. This isn't strictly necessary for normal usage,
/// but can help Marten with asynchronous projections where Marten hasn't yet encountered
Expand Down
6 changes: 6 additions & 0 deletions src/Marten/Events/IReadOnlyEventStoreOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,10 @@ public interface IReadOnlyEventStoreOptions
/// but this will be true in 8.0
/// </summary>
bool UseMandatoryStreamTypeDeclaration { get; set; }

/// <summary>
/// Opt into having Marten process "side effects" on aggregation projections (SingleStreamProjection/MultiStreamProjection) while
/// running in an Inline lifecycle. Default is false;
/// </summary>
public bool EnableSideEffectsOnInlineProjections { get; set; }
}
5 changes: 0 additions & 5 deletions src/Marten/Events/Projections/ProjectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,6 @@ public void Add<TProjection>(
)
where TProjection : GeneratedProjection, new()
{
if (lifecycle == ProjectionLifecycle.Live)
{
throw new InvalidOperationException("The generic overload of Add does not support Live projections, please use the non-generic overload.");
}

var projection = new TProjection { Lifecycle = lifecycle };

asyncConfiguration?.Invoke(projection.Options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,17 @@ public async Task SaveChangesAsync(CancellationToken token = default)

await ExecuteBatchAsync(batch, token).ConfigureAwait(false);

if (_messageBatch != null)
{
await _messageBatch.AfterCommitAsync(this, _workTracker, token).ConfigureAwait(false);
}

resetDirtyChecking();

EjectPatchedTypes(_workTracker);
Logger.RecordSavedChanges(this, _workTracker);

foreach (var listener in Listeners)
foreach (var listener in Listeners.ToArray())
{
await listener.AfterCommitAsync(this, _workTracker, token).ConfigureAwait(false);
}
Expand Down
9 changes: 8 additions & 1 deletion src/Marten/Internal/Sessions/DocumentSessionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ internal ValueTask<IMessageBatch> CurrentMessageBatch()
return batch.CurrentMessageBatch(this);
}

throw new InvalidOperationException("This session is not a ProjectionDocumentSession");
return StartMessageBatch();
}

internal ITenancy Tenancy => DocumentStore.As<DocumentStore>().Tenancy;
Expand Down Expand Up @@ -470,4 +470,11 @@ internal bool TryGetAggregateFromIdentityMap<TDoc, TId>(TId id, out TDoc documen
document = default;
return false;
}

private IMessageBatch? _messageBatch;
internal async ValueTask<IMessageBatch> StartMessageBatch()
{
_messageBatch ??= await Options.Events.MessageOutbox.CreateBatch(this).ConfigureAwait(false);
return _messageBatch;
}
}
4 changes: 3 additions & 1 deletion src/Marten/Subscriptions/SubscriptionExecution.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ private async Task executeRange(EventRange range)

var batch = new ProjectionUpdateBatch(_store.Options.Projections, parent, Mode, _cancellation.Token) {
ShouldApplyListeners = Mode == ShardExecutionMode.Continuous && range.Events.Any()
};;
};

await batch.InitializeMessageBatch().ConfigureAwait(false);

// Mark the progression
batch.Queue.Post(range.BuildProgressionOperation(_store.Events));
Expand Down
Loading