diff --git a/src/JasperFx.Events/Aggregation/JasperFxAggregationProjectionBase.Runtime.cs b/src/JasperFx.Events/Aggregation/JasperFxAggregationProjectionBase.Runtime.cs index afdb6d7..c8e2d46 100644 --- a/src/JasperFx.Events/Aggregation/JasperFxAggregationProjectionBase.Runtime.cs +++ b/src/JasperFx.Events/Aggregation/JasperFxAggregationProjectionBase.Runtime.cs @@ -111,7 +111,7 @@ public virtual (TDoc?, ActionType) DetermineAction(TDoc? snapshot, TId identity, return new(snapshot, ActionType.Delete); } - + snapshot = await _evolve(snapshot, identity, session, events, cancellation); if (snapshot == null) @@ -141,6 +141,19 @@ public virtual (TDoc?, ActionType) DetermineAction(TDoc? snapshot, TId identity, public virtual ValueTask EvolveAsync(TDoc? snapshot, TId id, TQuerySession session, IEvent e, CancellationToken cancellation) { + // Archived is a terminal stream marker, not an aggregate-creating event. In a + // composite with multiple single-stream children, a sibling projection may see + // an Archived event for a stream it does not own — if that projection defines + // Create(Archived), the default flow would otherwise materialize a phantom + // aggregate under the other projection's stream id. Guard: when no snapshot + // exists, skip Archived. Once a snapshot exists (either pre-loaded or created + // by a preceding event in the same slice), Apply(Archived) hooks still run. + // See issue JasperFx/marten#4093. + if (snapshot == null && e is IEvent) + { + return new ValueTask(default(TDoc)); + } + return (snapshot == null ? _application.Create(e, session, cancellation) : _application.ApplyAsync(snapshot, e, session, cancellation)!)!; diff --git a/src/JasperFx.Events/Aggregation/JasperFxSingleStreamProjectionBase.cs b/src/JasperFx.Events/Aggregation/JasperFxSingleStreamProjectionBase.cs index dffea29..9cfb8c9 100644 --- a/src/JasperFx.Events/Aggregation/JasperFxSingleStreamProjectionBase.cs +++ b/src/JasperFx.Events/Aggregation/JasperFxSingleStreamProjectionBase.cs @@ -97,10 +97,15 @@ async Task IInlineProjection.ApplyAsync(TOperations session, IReadO (_, transformed) = tryApplyMetadata(stream.Events, transformed, id, storage); if (transformed == null && action != ActionType.Delete && action != ActionType.HardDelete) continue; - + storage.ApplyInline(transformed, action, id, stream.TenantId); - - maybeArchiveStream(storage, stream, id); + + // Gate archival on whether this projection owns the stream. Ownership + // is signalled by a pre-loaded snapshot OR a materialized one from the + // slice. In a composite projection with multiple single-stream children, + // sibling projections that do not own this stream skip the archive. + // See issue JasperFx/marten#4093. + maybeArchiveStream(storage, stream, id, ownsStream: snapshot != null || transformed != null); if (session.EnableSideEffectsOnInlineProjections) { @@ -134,9 +139,18 @@ private async Task processSideEffectMessages(TOperations session, TId id, Stream } } - private void maybeArchiveStream(IProjectionStorage storage, StreamAction action, TId id) + private void maybeArchiveStream(IProjectionStorage storage, StreamAction action, TId id, bool ownsStream) { - if (Scope == AggregationScope.SingleStream && action.Events.OfType>().Any()) + if (Scope != AggregationScope.SingleStream) return; + + // Only the single-stream projection that actually owns the stream — as signalled + // by a snapshot being present either before or after the slice is applied — + // should archive the stream. In a composite projection with multiple single + // stream children, sibling projections otherwise fire redundant (or phantom) + // stream-archival operations. See issue JasperFx/marten#4093. + if (!ownsStream) return; + + if (action.Events.OfType>().Any()) { storage.ArchiveStream(id, action.TenantId); } diff --git a/src/JasperFx.Events/Daemon/AggregationRunner.cs b/src/JasperFx.Events/Daemon/AggregationRunner.cs index fc9842c..b4f9f3b 100644 --- a/src/JasperFx.Events/Daemon/AggregationRunner.cs +++ b/src/JasperFx.Events/Daemon/AggregationRunner.cs @@ -186,7 +186,11 @@ public async Task ApplyChangesAsync(ShardExecutionMode mode, await processPossibleSideEffects(batch, operations, slice).ConfigureAwait(false); } - maybeArchiveStream(storage, slice); + // Only archive from the perspective of a projection that actually owns the stream. + // The delete-type branch only fires when a pre-existing snapshot is present + // (otherwise buildActionAsync returns Nothing), so slice.Snapshot signals ownership. + // See issue JasperFx/marten#4093. + maybeArchiveStream(storage, slice, ownsStream: slice.Snapshot != null); storage.Delete(slice.Id); return; } @@ -195,7 +199,7 @@ public async Task ApplyChangesAsync(ShardExecutionMode mode, slice.Events(), cancellation); slice.RecordAction(action); - + if (action == ActionType.Nothing) { return; @@ -203,7 +207,10 @@ public async Task ApplyChangesAsync(ShardExecutionMode mode, (var lastEvent, snapshot) = Projection.TryApplyMetadata(slice.Events(), snapshot, slice.Id, storage); - maybeArchiveStream(storage, slice); + // Ownership: either there was a pre-loaded snapshot or the slice's events + // materialized one. Siblings that did neither skip the archive. + // See issue JasperFx/marten#4093. + maybeArchiveStream(storage, slice, ownsStream: slice.Snapshot != null || snapshot != null); if (mode == ShardExecutionMode.Continuous) { @@ -263,9 +270,18 @@ public IAggregateCache CacheFor(string tenantId) } } - private void maybeArchiveStream(IProjectionStorage storage, EventSlice slice) + private void maybeArchiveStream(IProjectionStorage storage, EventSlice slice, bool ownsStream) { - if (Projection.Scope == AggregationScope.SingleStream && slice.Events().OfType>().Any()) + if (Projection.Scope != AggregationScope.SingleStream) return; + + // Only the single-stream projection that actually owns the stream — as signalled + // by a snapshot being present either before or after the slice is applied — + // should archive the stream. In a composite projection with multiple single + // stream children, sibling projections otherwise fire redundant (or phantom) + // stream-archival operations. See issue JasperFx/marten#4093. + if (!ownsStream) return; + + if (slice.Events().OfType>().Any()) { storage.ArchiveStream(slice.Id, slice.TenantId); } diff --git a/src/JasperFx.Events/JasperFx.Events.csproj b/src/JasperFx.Events/JasperFx.Events.csproj index 4765eea..76ea3aa 100644 --- a/src/JasperFx.Events/JasperFx.Events.csproj +++ b/src/JasperFx.Events/JasperFx.Events.csproj @@ -3,7 +3,7 @@ Foundational Event Store Abstractions and Projections for the Critter Stack JasperFx.Events - 1.27.0 + 1.28.0