Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,48 @@ public async Task can_use_version_metadata_on_existing_stream_with_expected_vers
ProjectionWithVersions.VersionsSeen.ShouldBe([5, 6, 7, 8]);

}

// Regression for #4481: when an Evolve-based inline projection's stream is
// fetched via FetchForWriting and no events are appended, an unrelated
// document insert in the same session was failing with
// JasperFx.ConcurrencyException because the empty stream still flowed
// through the inline-projection Apply step and queued a storage operation
// that did an optimistic-concurrency check against the unchanged version.
[Fact]
public async Task fetch_for_writing_without_appending_does_not_block_unrelated_inserts()
{
StoreOptions(opts =>
{
opts.Projections.Add<ProjectionWithVersions>(ProjectionLifecycle.Inline);
opts.Schema.For<UnrelatedDoc>().Identity(x => x.Id);
});

var streamId = Guid.NewGuid();

await using (var session = theStore.LightweightSession())
{
session.Events.StartStream<VersionedGuy>(streamId, new AEvent(), new BEvent());
await session.SaveChangesAsync();
}

await using (var session = theStore.LightweightSession())
{
// Fetch the stream but deliberately do not append any new events.
await session.Events.FetchForWriting<VersionedGuy>(streamId);

// Insert an unrelated document on the same session.
session.Store(new UnrelatedDoc { Id = streamId, Note = "no new events" });

// Should NOT throw — the empty stream must not propagate an
// optimistic-concurrency check on the unchanged aggregate.
await session.SaveChangesAsync();
}

await using var verify = theStore.QuerySession();
var unrelated = await verify.LoadAsync<UnrelatedDoc>(streamId);
unrelated.ShouldNotBeNull();
unrelated.Note.ShouldBe("no new events");
}
}

public class ProjectionWithVersions : SingleStreamProjection<VersionedGuy, Guid>
Expand Down Expand Up @@ -166,3 +208,9 @@ public class VersionedGuy

public int Version { get; set; }
}

public class UnrelatedDoc
{
public Guid Id { get; set; }
public string Note { get; set; } = "";
}
7 changes: 6 additions & 1 deletion src/Marten/Events/QuickEventAppender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,14 @@ public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase
{
registerOperationsForStreams(eventGraph, session);

// Issue #4481: only pass streams that actually have events. See the
// matching guard in RichEventAppender.ProcessEventsAsync for context.
var streamsWithEvents = session.WorkTracker.Streams.Where(x => x.Events.Any()).ToList();
if (streamsWithEvents.Count == 0) return;

foreach (var projection in inlineProjections)
{
await projection.ApplyAsync(session, session.WorkTracker.Streams.ToList(), token).ConfigureAwait(false);
await projection.ApplyAsync(session, streamsWithEvents, token).ConfigureAwait(false);
}
}
}
19 changes: 17 additions & 2 deletions src/Marten/Events/RichEventAppender.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,24 @@ public async Task ProcessEventsAsync(EventGraph eventGraph, DocumentSessionBase
}

// TODO -- look for opportunities to batch up the requests here too!
foreach (var projection in inlineProjections)
//
// Issue #4481: only pass streams that actually have events. An empty
// stream (e.g. FetchForWriting<TAggregate> called without any
// subsequent AppendOne/AppendMany) used to slip through here and
// trigger an inline projection's snapshot-write path on the unchanged
// aggregate, raising a JasperFx.ConcurrencyException on the next
// SaveChangesAsync for any other work on the same session. The
// upstream JasperFx aggregation base's `AppliesTo(eventTypes)`
// returns `true` unconditionally when the projection has no
// statically-known event types (Evolve-only projections), so the
// empty stream was not being filtered downstream.
var streamsWithEvents = session.WorkTracker.Streams.Where(x => x.Events.Any()).ToList();
if (streamsWithEvents.Count > 0)
{
await projection.ApplyAsync(session, session.WorkTracker.Streams.ToList(), token).ConfigureAwait(false);
foreach (var projection in inlineProjections)
{
await projection.ApplyAsync(session, streamsWithEvents, token).ConfigureAwait(false);
}
}
}
}
Expand Down