Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public virtual (TDoc?, ActionType) DetermineAction(TDoc? snapshot, TId identity,

return new(snapshot, ActionType.Delete);
}

snapshot = await _evolve(snapshot, identity, session, events, cancellation);

if (snapshot == null)
Expand Down Expand Up @@ -141,6 +141,19 @@ public virtual (TDoc?, ActionType) DetermineAction(TDoc? snapshot, TId identity,
public virtual ValueTask<TDoc?> EvolveAsync(TDoc? snapshot, TId id, TQuerySession session, IEvent e,
CancellationToken cancellation)
{
// Archived is a terminal stream marker, not an aggregate-creating event. In a
// composite with multiple single-stream children, a sibling projection may see
// an Archived event for a stream it does not own — if that projection defines
// Create(Archived), the default flow would otherwise materialize a phantom
// aggregate under the other projection's stream id. Guard: when no snapshot
// exists, skip Archived. Once a snapshot exists (either pre-loaded or created
// by a preceding event in the same slice), Apply(Archived) hooks still run.
// See issue JasperFx/marten#4093.
if (snapshot == null && e is IEvent<Archived>)
{
return new ValueTask<TDoc?>(default(TDoc));
}

return (snapshot == null
? _application.Create(e, session, cancellation)
: _application.ApplyAsync(snapshot, e, session, cancellation)!)!;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,15 @@ async Task IInlineProjection<TOperations>.ApplyAsync(TOperations session, IReadO
(_, transformed) = tryApplyMetadata(stream.Events, transformed, id, storage);

if (transformed == null && action != ActionType.Delete && action != ActionType.HardDelete) continue;

storage.ApplyInline(transformed, action, id, stream.TenantId);

maybeArchiveStream(storage, stream, id);

// Gate archival on whether this projection owns the stream. Ownership
// is signalled by a pre-loaded snapshot OR a materialized one from the
// slice. In a composite projection with multiple single-stream children,
// sibling projections that do not own this stream skip the archive.
// See issue JasperFx/marten#4093.
maybeArchiveStream(storage, stream, id, ownsStream: snapshot != null || transformed != null);

if (session.EnableSideEffectsOnInlineProjections)
{
Expand Down Expand Up @@ -134,9 +139,18 @@ private async Task processSideEffectMessages(TOperations session, TId id, Stream
}
}

private void maybeArchiveStream(IProjectionStorage<TDoc, TId> storage, StreamAction action, TId id)
private void maybeArchiveStream(IProjectionStorage<TDoc, TId> storage, StreamAction action, TId id, bool ownsStream)
{
if (Scope == AggregationScope.SingleStream && action.Events.OfType<IEvent<Archived>>().Any())
if (Scope != AggregationScope.SingleStream) return;

// Only the single-stream projection that actually owns the stream — as signalled
// by a snapshot being present either before or after the slice is applied —
// should archive the stream. In a composite projection with multiple single
// stream children, sibling projections otherwise fire redundant (or phantom)
// stream-archival operations. See issue JasperFx/marten#4093.
if (!ownsStream) return;

if (action.Events.OfType<IEvent<Archived>>().Any())
{
storage.ArchiveStream(id, action.TenantId);
}
Expand Down
26 changes: 21 additions & 5 deletions src/JasperFx.Events/Daemon/AggregationRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,11 @@ public async Task ApplyChangesAsync(ShardExecutionMode mode,
await processPossibleSideEffects(batch, operations, slice).ConfigureAwait(false);
}

maybeArchiveStream(storage, slice);
// Only archive from the perspective of a projection that actually owns the stream.
// The delete-type branch only fires when a pre-existing snapshot is present
// (otherwise buildActionAsync returns Nothing), so slice.Snapshot signals ownership.
// See issue JasperFx/marten#4093.
maybeArchiveStream(storage, slice, ownsStream: slice.Snapshot != null);
storage.Delete(slice.Id);
return;
}
Expand All @@ -195,15 +199,18 @@ public async Task ApplyChangesAsync(ShardExecutionMode mode,
slice.Events(), cancellation);

slice.RecordAction(action);

if (action == ActionType.Nothing)
{
return;
}

(var lastEvent, snapshot) = Projection.TryApplyMetadata(slice.Events(), snapshot, slice.Id, storage);

maybeArchiveStream(storage, slice);
// Ownership: either there was a pre-loaded snapshot or the slice's events
// materialized one. Siblings that did neither skip the archive.
// See issue JasperFx/marten#4093.
maybeArchiveStream(storage, slice, ownsStream: slice.Snapshot != null || snapshot != null);

if (mode == ShardExecutionMode.Continuous)
{
Expand Down Expand Up @@ -263,9 +270,18 @@ public IAggregateCache<TId, TDoc> CacheFor(string tenantId)
}
}

private void maybeArchiveStream(IProjectionStorage<TDoc, TId> storage, EventSlice<TDoc, TId> slice)
private void maybeArchiveStream(IProjectionStorage<TDoc, TId> storage, EventSlice<TDoc, TId> slice, bool ownsStream)
{
if (Projection.Scope == AggregationScope.SingleStream && slice.Events().OfType<IEvent<Archived>>().Any())
if (Projection.Scope != AggregationScope.SingleStream) return;

// Only the single-stream projection that actually owns the stream — as signalled
// by a snapshot being present either before or after the slice is applied —
// should archive the stream. In a composite projection with multiple single
// stream children, sibling projections otherwise fire redundant (or phantom)
// stream-archival operations. See issue JasperFx/marten#4093.
if (!ownsStream) return;

if (slice.Events().OfType<IEvent<Archived>>().Any())
{
storage.ArchiveStream(slice.Id, slice.TenantId);
}
Expand Down
2 changes: 1 addition & 1 deletion src/JasperFx.Events/JasperFx.Events.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<PropertyGroup>
<Description>Foundational Event Store Abstractions and Projections for the Critter Stack</Description>
<PackageId>JasperFx.Events</PackageId>
<Version>1.27.0</Version>
<Version>1.28.0</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading