diff --git a/docs/configuration/cli.md b/docs/configuration/cli.md index a0d77c35dd..efde448c94 100644 --- a/docs/configuration/cli.md +++ b/docs/configuration/cli.md @@ -233,4 +233,3 @@ marten - Advanced Marten operations to 'heal' event store projection issues or r [-l, --log-level Trace|Debug|Information|Warning|Error|Critical|None] Override the log level [--config: ] Overwrite individual configuration items ``` - diff --git a/src/EventSourcingTests/FetchForWriting/fetch_latest_async_aggregate.cs b/src/EventSourcingTests/FetchForWriting/fetch_latest_async_aggregate.cs index 805ddca31c..a3745fc24d 100644 --- a/src/EventSourcingTests/FetchForWriting/fetch_latest_async_aggregate.cs +++ b/src/EventSourcingTests/FetchForWriting/fetch_latest_async_aggregate.cs @@ -5,6 +5,7 @@ using JasperFx.Events; using Marten.Events; using Marten.Events.Projections; +using Marten.Testing.Documents; using Marten.Testing.Harness; using Shouldly; using Xunit; @@ -35,6 +36,32 @@ public async Task from_no_current_activity_guid_centric() document.CCount.ShouldBe(3); } + [Fact] + public async Task from_no_current_activity_guid_centric_as_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + }); + + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + await using var query = theStore.LightweightSession(); + var batch = query.CreateBatchQuery(); + var documentQuery = batch.Events.FetchLatest(streamId); + await batch.Execute(); + + var document = await documentQuery; + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(2); + document.CCount.ShouldBe(3); + } + [Fact] public async Task from_no_current_activity_string_centric() @@ -59,6 +86,31 @@ public async Task from_no_current_activity_string_centric() document.CCount.ShouldBe(3); } + [Fact] + public async Task from_no_current_activity_string_centric_from_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var streamId = Guid.NewGuid().ToString(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + await using var query = theStore.LightweightSession(); + + + var document = await query.Events.FetchLatest(streamId); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(2); + document.CCount.ShouldBe(3); + } + [Fact] public async Task from_after_fetch_for_writing_guid_centric_brand_new_1() { @@ -81,6 +133,34 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_new_1() aggregate.CCount.ShouldBe(3); } + [Fact] + public async Task from_after_fetch_for_writing_guid_centric_brand_new_1_from_batch() + { + StoreOptions(opts => + { + opts.Events.UseIdentityMapForAggregates = true; + opts.Projections.Snapshot(SnapshotLifecycle.Async); + }); + + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.AppendMany(new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + + var aggregateQuery = batch.Events.FetchLatest(streamId); + await batch.Execute(); + + var aggregate = await aggregateQuery; + + aggregate.ACount.ShouldBe(1); + aggregate.BCount.ShouldBe(2); + aggregate.CCount.ShouldBe(3); + } + [Fact] public async Task from_after_fetch_for_writing_guid_centric_brand_new_no_optimization() { @@ -103,6 +183,33 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_new_no_optimiz aggregate.CCount.ShouldBe(3); } + [Fact] + public async Task from_after_fetch_for_writing_guid_centric_brand_new_no_optimization_from_batch() + { + StoreOptions(opts => + { + //opts.Events.UseIdentityMapForAggregates = true; + opts.Projections.Snapshot(SnapshotLifecycle.Async); + }); + + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.AppendMany(new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + + var aggregateQuery = batch.Events.FetchLatest(streamId); + await batch.Execute(); + var aggregate = await aggregateQuery; + + aggregate.ACount.ShouldBe(1); + aggregate.BCount.ShouldBe(2); + aggregate.CCount.ShouldBe(3); + } + [Fact] public async Task from_after_fetch_for_writing_guid_centric_brand_existing() @@ -132,6 +239,47 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_existing() document.DCount.ShouldBe(2); } + [Fact] + public async Task from_after_fetch_for_writing_guid_centric_brand_existing_from_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + }); + + var target1 = Target.Random(); + var target2 = Target.Random(); + theSession.Store(target1, target2); + await theSession.SaveChangesAsync(); + + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + using var session = theStore.LightweightSession(); + var stream = await session.Events.FetchForWriting(streamId); + stream.AppendMany(new DEvent(), new DEvent()); + await session.SaveChangesAsync(); + + await using var query = theStore.LightweightSession(); + var batch = query.CreateBatchQuery(); + var targetQuery1 = batch.Load(target1.Id); + var documentQuery = batch.Events.FetchLatest(streamId); + var targetQuery2 = batch.Load(target2.Id); + await batch.Execute(); + + (await targetQuery1).ShouldNotBeNull(); + var document = await documentQuery; + (await targetQuery2).ShouldNotBeNull(); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(2); + document.CCount.ShouldBe(3); + document.DCount.ShouldBe(2); + } + [Fact] public async Task from_after_fetch_for_writing_string_centric_brand_new() { @@ -154,6 +302,34 @@ public async Task from_after_fetch_for_writing_string_centric_brand_new() aggregate.CCount.ShouldBe(3); } + [Fact] + public async Task from_after_fetch_for_writing_string_centric_brand_new_with_batch() + { + StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Projections.Snapshot(SnapshotLifecycle.Async); + }); + + var streamId = Guid.NewGuid().ToString(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.AppendMany(new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + + var aggregateQuery = batch.Events.FetchLatest(streamId); + await batch.Execute(); + + var aggregate = await aggregateQuery; + + aggregate.ACount.ShouldBe(1); + aggregate.BCount.ShouldBe(2); + aggregate.CCount.ShouldBe(3); + } + [Fact] public async Task from_after_fetch_for_writing_string_centric_existing() { @@ -183,6 +359,38 @@ public async Task from_after_fetch_for_writing_string_centric_existing() document.DCount.ShouldBe(2); } + [Fact] + public async Task from_after_fetch_for_writing_string_centric_existing_with_batch() + { + StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Projections.Snapshot(SnapshotLifecycle.Async); + }); + + var streamId = Guid.NewGuid().ToString(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + using var session = theStore.LightweightSession(); + var stream = await session.Events.FetchForWriting(streamId); + stream.AppendMany(new DEvent(), new DEvent()); + await session.SaveChangesAsync(); + + await using var query = theStore.LightweightSession(); + var batch = session.CreateBatchQuery(); + var documentQuery = batch.Events.FetchLatest(streamId); + await batch.Execute(); + var document = await documentQuery; + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(2); + document.CCount.ShouldBe(3); + document.DCount.ShouldBe(2); + } + [Fact] public async Task fetch_latest_immutable_aggregate_running_inline() { @@ -238,6 +446,7 @@ public async Task fetch_latest_immutable_aggregate_running_inline_and_identity_m var aggregate2 = await session2.Events.FetchLatest(streamId); aggregate2.Name.ShouldBe("Random"); } + } public record CreatedEvent(Guid Id, string Name, Dictionary TestData); diff --git a/src/EventSourcingTests/FetchForWriting/fetch_latest_inline_aggregate.cs b/src/EventSourcingTests/FetchForWriting/fetch_latest_inline_aggregate.cs index 33c1918728..bdadf8b7a5 100644 --- a/src/EventSourcingTests/FetchForWriting/fetch_latest_inline_aggregate.cs +++ b/src/EventSourcingTests/FetchForWriting/fetch_latest_inline_aggregate.cs @@ -4,6 +4,7 @@ using JasperFx.Events; using Marten.Events; using Marten.Events.Projections; +using Marten.Testing.Documents; using Marten.Testing.Harness; using Shouldly; using Xunit; @@ -35,6 +36,34 @@ public async Task from_no_current_activity_guid_centric() document.CCount.ShouldBe(3); } + [Fact] + public async Task from_no_current_activity_guid_centric_in_batch() + { + StoreOptions(opts => + { + opts.Events.UseIdentityMapForAggregates = true; + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + await using var query = theStore.LightweightSession(); + + var batch = query.CreateBatchQuery(); + + var documentQuery = batch.Events.FetchLatest(streamId); + await batch.Execute(); + var document = await documentQuery; + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(2); + document.CCount.ShouldBe(3); + } + [Fact] public async Task from_no_current_activity_string_centric() @@ -59,6 +88,32 @@ public async Task from_no_current_activity_string_centric() document.CCount.ShouldBe(3); } + [Fact] + public async Task from_no_current_activity_string_centric_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var streamId = Guid.NewGuid().ToString(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + await using var query = theStore.LightweightSession(); + var batch = query.CreateBatchQuery(); + var documentQuery = batch.Events.FetchLatest(streamId); + await batch.Execute(); + var document = await documentQuery; + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(2); + document.CCount.ShouldBe(3); + } + [Fact] public async Task from_after_fetch_for_writing_guid_centric_brand_new() { @@ -132,6 +187,42 @@ public async Task from_after_fetch_for_writing_string_centric_brand_new() aggregate.CCount.ShouldBe(3); } + [Fact] + public async Task from_after_fetch_for_writing_string_centric_brand_new_in_batch() + { + StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + var target1 = Target.Random(); + var target2 = Target.Random(); + theSession.Store(target1, target2); + await theSession.SaveChangesAsync(); + + var streamId = Guid.NewGuid().ToString(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.AppendMany(new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var targetQuery1 = batch.Load(target1.Id); + var aggregateQuery = batch.Events.FetchLatest(streamId); + var targetQuery2 = batch.Load(target2.Id); + await batch.Execute(); + + (await targetQuery1).ShouldNotBeNull(); + var aggregate = await aggregateQuery; + (await targetQuery2).ShouldNotBeNull(); + + aggregate.ACount.ShouldBe(1); + aggregate.BCount.ShouldBe(2); + aggregate.CCount.ShouldBe(3); + } + [Fact] public async Task from_after_fetch_for_writing_string_centric_existing() { @@ -162,4 +253,37 @@ public async Task from_after_fetch_for_writing_string_centric_existing() document.DCount.ShouldBe(2); } + [Fact] + public async Task from_after_fetch_for_writing_string_centric_existing_in_batch() + { + StoreOptions(opts => + { + opts.Events.UseIdentityMapForAggregates = true; + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + var streamId = Guid.NewGuid().ToString(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + using var session = theStore.LightweightSession(); + var stream = await session.Events.FetchForWriting(streamId); + stream.AppendMany(new DEvent(), new DEvent()); + await session.SaveChangesAsync(); + + await using var query = theStore.LightweightSession(); + var batch = query.CreateBatchQuery(); + var documentQuery = batch.Events.FetchLatest(streamId); + await batch.Execute(); + var document = await documentQuery; + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(2); + document.CCount.ShouldBe(3); + document.DCount.ShouldBe(2); + } + } diff --git a/src/EventSourcingTests/FetchForWriting/fetch_latest_live_aggregate.cs b/src/EventSourcingTests/FetchForWriting/fetch_latest_live_aggregate.cs index f651ccdd4e..1923f7ca30 100644 --- a/src/EventSourcingTests/FetchForWriting/fetch_latest_live_aggregate.cs +++ b/src/EventSourcingTests/FetchForWriting/fetch_latest_live_aggregate.cs @@ -4,6 +4,7 @@ using JasperFx.Events; using Marten.Events; using Marten.Events.Projections; +using Marten.Testing.Documents; using Marten.Testing.Harness; using Shouldly; using Xunit; @@ -29,6 +30,38 @@ public async Task from_no_current_activity_guid_centric() document.CCount.ShouldBe(3); } + [Fact] + public async Task from_no_current_activity_guid_centric_in_batch() + { + var target1 = Target.Random(); + var target2 = Target.Random(); + theSession.Store(target1, target2); + await theSession.SaveChangesAsync(); + + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + await using var query = theStore.LightweightSession(); + var batch = query.CreateBatchQuery(); + + var targetQuery1 = batch.Load(target1.Id); + var documentQuery = batch.Events.FetchLatest(streamId); + var targetQuery2 = batch.Load(target2.Id); + + await batch.Execute(); + + (await targetQuery1).ShouldNotBeNull(); + var document = await documentQuery; + (await targetQuery2).ShouldNotBeNull(); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(2); + document.CCount.ShouldBe(3); + } + [Fact] public async Task from_no_current_activity_string_centric() @@ -52,6 +85,31 @@ public async Task from_no_current_activity_string_centric() document.CCount.ShouldBe(3); } + [Fact] + public async Task from_no_current_activity_string_centric_in_batch() + { + StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var streamId = Guid.NewGuid().ToString(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + await using var query = theStore.LightweightSession(); + var batch = query.CreateBatchQuery(); + var documentQuery = batch.Events.FetchLatest(streamId); + await batch.Execute(); + var document = await documentQuery; + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(2); + document.CCount.ShouldBe(3); + } + [Fact] public async Task from_after_fetch_for_writing_guid_centric_brand_new_no_optimization() { @@ -68,6 +126,25 @@ public async Task from_after_fetch_for_writing_guid_centric_brand_new_no_optimiz aggregate.CCount.ShouldBe(3); } + [Fact] + public async Task from_after_fetch_for_writing_guid_centric_brand_new_no_optimization_in_batch() + { + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.AppendMany(new AEvent(), new BEvent(), new BEvent(), new CEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var aggregateQuery = batch.Events.FetchLatest(streamId); + await batch.Execute(); + var aggregate = await aggregateQuery; + aggregate.ACount.ShouldBe(1); + aggregate.BCount.ShouldBe(2); + aggregate.CCount.ShouldBe(3); + } + [Fact] public async Task from_after_fetch_for_writing_guid_centric_brand_new_with_optimization() { diff --git a/src/EventSourcingTests/FetchForWriting/fetching_async_aggregates_for_writing.cs b/src/EventSourcingTests/FetchForWriting/fetching_async_aggregates_for_writing.cs index 55e0270555..4576bb1aaa 100644 --- a/src/EventSourcingTests/FetchForWriting/fetching_async_aggregates_for_writing.cs +++ b/src/EventSourcingTests/FetchForWriting/fetching_async_aggregates_for_writing.cs @@ -10,6 +10,7 @@ using Marten.Exceptions; using Marten.Schema; using Marten.Storage; +using Marten.Testing.Documents; using Marten.Testing.Harness; using Shouldly; using Xunit; @@ -49,6 +50,34 @@ public async Task fetch_new_stream_for_writing_Guid_identifier() document.CCount.ShouldBe(2); } + [Fact] + public async Task fetch_new_stream_for_writing_Guid_identifier_in_batch() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Async)); + + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + + var documentQuery = batch.Events.FetchForWriting(streamId); + await batch.Execute(); + var document = (await documentQuery).Aggregate; + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_new_stream_for_writing_Guid_identifier_exception_handling() { @@ -104,6 +133,37 @@ public async Task fetch_existing_stream_for_writing_Guid_identifier() document.CCount.ShouldBe(2); } + [Fact] + public async Task fetch_existing_stream_for_writing_Guid_identifier_in_batch() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Async)); + + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId); + + await batch.Execute(); + + var stream = await streamQuery; + + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + var document = stream.Aggregate; + + document.Id.ShouldBe(streamId); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_Guid_identifier_multi_tenanted() { @@ -159,6 +219,38 @@ public async Task fetch_new_stream_for_writing_string_identifier() document.CCount.ShouldBe(2); } + [Fact] + public async Task fetch_new_stream_for_writing_string_identifier_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var streamId = Guid.NewGuid().ToString(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + + var documentQuery = batch.Events.FetchForWriting(streamId); + await batch.Execute(); + var document = (await documentQuery).Aggregate; + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_string_identifier() { @@ -188,6 +280,41 @@ public async Task fetch_existing_stream_for_writing_string_identifier() document.CCount.ShouldBe(2); } + [Fact] + public async Task fetch_existing_stream_for_writing_string_identifier_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + + var streamQuery = batch.Events.FetchForWriting(streamId); + await batch.Execute(); + + var stream = await streamQuery; + + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + var document = stream.Aggregate; + + document.Id.ShouldBe(streamId); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_string_identifier_multi_tenanted() { @@ -244,6 +371,37 @@ public async Task fetch_existing_stream_exclusively_happy_path_for_writing_Guid_ document.CCount.ShouldBe(2); } + [Fact] + public async Task fetch_existing_stream_exclusively_happy_path_for_writing_Guid_identifier_in_batch() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Async)); + + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + theSession.Logger = new TestOutputMartenLogger(_output); + + var batch = theSession.CreateBatchQuery(); + + var streamQuery = batch.Events.FetchForExclusiveWriting(streamId); + await batch.Execute(); + var stream = await streamQuery; + + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + var document = stream.Aggregate; + + document.Id.ShouldBe(streamId); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_Guid_identifier_sad_path() { @@ -295,6 +453,39 @@ public async Task fetch_existing_stream_exclusively_happy_path_for_writing_strin document.CCount.ShouldBe(2); } + + [Fact] + public async Task fetch_existing_stream_exclusively_happy_path_for_writing_string_identifier_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForExclusiveWriting(streamId); + await batch.Execute(); + var stream = await streamQuery; + + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + var document = stream.Aggregate; + + document.Id.ShouldBe(streamId); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_string_identifier_sad_path() { @@ -322,13 +513,28 @@ await Should.ThrowAsync(async () => } + [Fact] + public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Async)); + var streamId = Guid.NewGuid(); + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + var stream = await theSession.Events.FetchForWriting(streamId, 6); + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); - [Fact] - public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version() + stream.AppendOne(new EEvent()); + await theSession.SaveChangesAsync(); + } + + [Fact] + public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version_in_batch() { StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Async)); @@ -339,7 +545,11 @@ public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expecte new CEvent(), new CEvent()); await theSession.SaveChangesAsync(); - var stream = await theSession.Events.FetchForWriting(streamId, 6); + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 6); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldNotBeNull(); stream.CurrentVersion.ShouldBe(6); @@ -395,6 +605,39 @@ await Should.ThrowAsync(async () => }); } + [Fact] + public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version_sad_path_on_save_changes_from_batch() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Async)); + + + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + // This should be fine + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 6); + await batch.Execute(); + var stream = await streamQuery; + stream.AppendOne(new EEvent()); + + // Get in between and run other events in a different session + await using (var otherSession = theStore.LightweightSession()) + { + otherSession.Events.Append(streamId, new EEvent()); + await otherSession.SaveChangesAsync(); + } + + // The version is now off + await Should.ThrowAsync(async () => + { + await theSession.SaveChangesAsync(); + }); + } + [Fact] @@ -420,6 +663,34 @@ public async Task fetch_existing_stream_for_writing_string_identifier_with_expec await theSession.SaveChangesAsync(); } + + + [Fact] + public async Task fetch_existing_stream_for_writing_string_identifier_with_expected_version_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 6); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + stream.AppendOne(new EEvent()); + await theSession.SaveChangesAsync(); + } + [Fact] public async Task fetch_existing_stream_for_writing_string_identifier_with_expected_version_immediate_sad_path() { @@ -441,6 +712,31 @@ await Should.ThrowAsync(async () => }); } + [Fact] + public async Task fetch_existing_stream_for_writing_string_identifier_with_expected_version_immediate_sad_path_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + await Should.ThrowAsync(async () => + { + var batch = theSession.CreateBatchQuery(); + + var streamQuery = batch.Events.FetchForWriting(streamId, 5); + await batch.Execute(); + var stream = await streamQuery; + }); + } + [Fact] public async Task fetch_existing_stream_for_writing_string_identifier_with_expected_version_sad_path_on_save_changes() { @@ -474,6 +770,42 @@ await Should.ThrowAsync(async () => }); } + [Fact] + public async Task fetch_existing_stream_for_writing_string_identifier_with_expected_version_sad_path_on_save_changes_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + // This should be fine + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 6); + await batch.Execute(); + var stream = await streamQuery; + stream.AppendOne(new EEvent()); + + // Get in between and run other events in a different session + await using (var otherSession = theStore.LightweightSession()) + { + otherSession.Events.Append(streamId, new EEvent()); + await otherSession.SaveChangesAsync(); + } + + // The version is now off + await Should.ThrowAsync(async () => + { + await theSession.SaveChangesAsync(); + }); + } + [Fact] public async Task fetch_aggregate_that_is_completely_caught_up() { @@ -500,6 +832,47 @@ public async Task fetch_aggregate_that_is_completely_caught_up() await theSession.SaveChangesAsync(); } + [Fact] + public async Task fetch_aggregate_that_is_completely_caught_up_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var target1 = Target.Random(); + var target2 = Target.Random(); + theSession.Store(target1, target2); + await theSession.SaveChangesAsync(); + + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.RebuildProjectionAsync(CancellationToken.None); + + var batch = theSession.CreateBatchQuery(); + var targetQuery1 = batch.Load(target1.Id); + var streamQuery = batch.Events.FetchForWriting(streamId, 6); + var targetQuery2 = batch.Load(target2.Id); + + await batch.Execute(); + + (await targetQuery1).ShouldNotBeNull(); + var stream = await streamQuery; + (await targetQuery2).ShouldNotBeNull(); + + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + stream.AppendOne(new EEvent()); + await theSession.SaveChangesAsync(); + } + [Fact] public async Task fetch_aggregate_that_is_completely_caught_up_with_no_version_supplied() { @@ -526,7 +899,34 @@ public async Task fetch_aggregate_that_is_completely_caught_up_with_no_version_s await theSession.SaveChangesAsync(); } + [Fact] + public async Task fetch_aggregate_that_is_completely_caught_up_with_no_version_supplied_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.RebuildProjectionAsync(CancellationToken.None); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + stream.AppendOne(new EEvent()); + await theSession.SaveChangesAsync(); + } [Fact] public async Task fetch_aggregate_that_is_completely_caught_up_use_exclusive_locks() @@ -597,6 +997,54 @@ public async Task fetch_aggregate_that_is_in_progress() stream.Aggregate.BCount.ShouldBe(6); } + [Fact] + public async Task fetch_aggregate_that_is_in_progress_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Async); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.RebuildProjectionAsync(CancellationToken.None); + await daemon.StopAllAsync(); + + var existing = await theSession.LoadAsync(streamId); + + var stream = await theSession.Events.FetchForWriting(streamId, 6); + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + existing = await theSession.LoadAsync(streamId); + + stream.AppendOne(new EEvent()); + await theSession.SaveChangesAsync(); + + using (var session = theStore.LightweightSession()) + { + session.Events.Append(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent()); + await session.SaveChangesAsync(); + } + + using var session2 = theStore.LightweightSession(); + var batch = session2.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 11); + await batch.Execute(); + stream = await streamQuery; + stream = await theSession.Events.FetchForWriting(streamId, 11); + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(11); + + stream.Aggregate.BCount.ShouldBe(6); + } + [Fact] public async Task override_the_optimistic_concurrency_on_projected_document() { @@ -624,6 +1072,37 @@ public async Task override_the_optimistic_concurrency_on_projected_document() Assert.Equal("foo", result.Aggregate.Name); } + + [Fact] + public async Task override_the_optimistic_concurrency_on_projected_document_in_batch() + { + StoreOptions(opts => + { + opts.Events.StreamIdentity = StreamIdentity.AsString; + opts.Projections.Snapshot(SnapshotLifecycle.Async); + }); + + var streamKey = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamKey, new NamedEvent("foo"), new EventB()); + await theSession.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.RebuildProjectionAsync(CancellationToken.None); + + var batch = theSession.CreateBatchQuery(); + var resultQuery = batch.Events.FetchForWriting(streamKey); + await batch.Execute(); + var result = await resultQuery; + + Assert.Equal(2, result.CurrentVersion); + Assert.Equal(2, result.StartingVersion); + Assert.NotNull(result.Aggregate); + Assert.Equal(streamKey, result.Aggregate.StreamKey); + // TODO: There is a weird bug here where ~25% of the time this is set to null. Seems to happen intermittently across all frameworks. No idea why. + Assert.Equal("foo", result.Aggregate.Name); + } + } public record NamedEvent(string Name); diff --git a/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs b/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs index 4733b6d48c..af96268e1d 100644 --- a/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs +++ b/src/EventSourcingTests/FetchForWriting/fetching_inline_aggregates_for_writing.cs @@ -11,6 +11,7 @@ using Marten.Exceptions; using Marten.Schema; using Marten.Storage; +using Marten.Testing.Documents; using Marten.Testing.Harness; using Microsoft.Extensions.Hosting; using Shouldly; @@ -77,6 +78,37 @@ public async Task revision_is_updated_after_quick_appending_with_IRevisioned() document.Version.ShouldBe(6); } + [Fact] + public async Task revision_is_updated_after_quick_appending_with_IRevisioned_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + opts.Events.AppendMode = EventAppendMode.Quick; + opts.Events.UseIdentityMapForAggregates = true; + }); + + var streamId = Guid.NewGuid(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId); + await batch.Execute(); + + var stream = await streamQuery; + + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + var document = await theSession.LoadAsync(streamId); + document.Version.ShouldBe(6); + } + [Fact] public async Task revision_is_updated_after_quick_appending_with_custom_mapped_version() { @@ -108,6 +140,41 @@ public async Task revision_is_updated_after_quick_appending_with_custom_mapped_v document.Version.ShouldBe(6); } + + [Fact] + public async Task revision_is_updated_after_quick_appending_with_custom_mapped_version_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Inline).Metadata(m => + { + m.Revision.MapTo(x => x.Version); + }); + + opts.Events.AppendMode = EventAppendMode.Quick; + opts.Events.UseIdentityMapForAggregates = true; + }); + + var streamId = Guid.NewGuid(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + theSession.Logger = new TestOutputMartenLogger(_output); + await theSession.SaveChangesAsync(); + + var document = await theSession.LoadAsync(streamId); + document.Version.ShouldBe(6); + } + [Fact] public async Task fetch_new_stream_for_writing_Guid_identifier_exception_handling() { @@ -137,6 +204,39 @@ await Should.ThrowAsync(async () => document.CCount.ShouldBe(2); } + + [Fact] + public async Task fetch_new_stream_for_writing_Guid_identifier_exception_handling_in_batch() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Inline)); + + var streamId = Guid.NewGuid(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + var sameStream = theSession.Events.StartStream(streamId, new AEvent()); + await Should.ThrowAsync(async () => + { + await theSession.SaveChangesAsync(); + }); + + var document = await theSession.Events.AggregateStreamAsync(streamId); + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_Guid_identifier() { @@ -161,6 +261,43 @@ public async Task fetch_existing_stream_for_writing_Guid_identifier() document.CCount.ShouldBe(2); } + + [Fact] + public async Task fetch_existing_stream_for_writing_Guid_identifier_in_batch() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Inline)); + + var target1 = Target.Random(); + var target2 = Target.Random(); + theSession.Store(target1, target2); + await theSession.SaveChangesAsync(); + + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var targetQuery1 = batch.Load(target1.Id); + var streamQuery = batch.Events.FetchForWriting(streamId); + var targetQuery2 = batch.Load(target2.Id); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + (await targetQuery1).ShouldNotBeNull(); + var document = stream.Aggregate; + (await targetQuery2).ShouldNotBeNull(); + + document.Id.ShouldBe(streamId); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_Guid_identifier_multi_tenanted() { @@ -216,6 +353,36 @@ public async Task fetch_new_stream_for_writing_string_identifier() document.CCount.ShouldBe(2); } + [Fact] + public async Task fetch_new_stream_for_writing_string_identifier_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + var streamId = Guid.NewGuid().ToString(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + var document = await theSession.Events.AggregateStreamAsync(streamId); + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_string_identifier() { @@ -245,6 +412,38 @@ public async Task fetch_existing_stream_for_writing_string_identifier() document.CCount.ShouldBe(2); } + [Fact] + public async Task fetch_existing_stream_for_writing_string_identifier_in_batch() + { + StoreOptions(opts => + { + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + + + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + var document = stream.Aggregate; + + document.Id.ShouldBe(streamId); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_string_identifier_multi_tenanted() { @@ -427,6 +626,32 @@ await Should.ThrowAsync(async () => }); } + [Fact] + public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version_using_identity_map_immediate_sad_path_in_batch() + { + StoreOptions( + opts => + { + opts.Events.UseIdentityMapForAggregates = true; + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + + + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + await Should.ThrowAsync(async () => + { + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 5); + await batch.Execute(); + await streamQuery; + }); + } + [Fact] public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version_using_identity_map_sad_path_on_save_changes() { @@ -482,6 +707,28 @@ public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expecte await theSession.SaveChangesAsync(); } + [Fact] + public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version_in_batch() + { + StoreOptions(opts => opts.Projections.Snapshot(SnapshotLifecycle.Inline)); + + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 6); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + stream.AppendOne(new EEvent()); + await theSession.SaveChangesAsync(); + } + [Fact] public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version_immediate_sad_path() { diff --git a/src/EventSourcingTests/FetchForWriting/fetching_live_aggregates_for_writing.cs b/src/EventSourcingTests/FetchForWriting/fetching_live_aggregates_for_writing.cs index b2c6e3b350..6ae40ff44d 100644 --- a/src/EventSourcingTests/FetchForWriting/fetching_live_aggregates_for_writing.cs +++ b/src/EventSourcingTests/FetchForWriting/fetching_live_aggregates_for_writing.cs @@ -15,6 +15,7 @@ using Marten.Events.Projections; using Marten.Exceptions; using Marten.Schema.Identity; +using Marten.Testing.Documents; using Marten.Testing.Harness; using Shouldly; using Xunit; @@ -54,6 +55,30 @@ public async Task fetch_new_stream_for_writing_Guid_identifier() document.CCount.ShouldBe(2); } + [Fact] + public async Task fetch_new_stream_for_writing_Guid_identifier_in_batch() + { + var streamId = Guid.NewGuid(); + + var stream = await theSession.Events.FetchForWriting(streamId); + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var documentQuery = batch.Events.FetchForWriting(streamId); + await batch.Execute(); + var document = (await documentQuery).Aggregate; + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_Guid_identifier() { @@ -76,6 +101,44 @@ public async Task fetch_existing_stream_for_writing_Guid_identifier() document.CCount.ShouldBe(2); } + + [Fact] + public async Task fetch_existing_stream_for_writing_Guid_identifier_in_batch() + { + var target1 = Target.Random(); + var target2 = Target.Random(); + theSession.Store(target1, target2); + await theSession.SaveChangesAsync(); + + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + + var batch = theSession.CreateBatchQuery(); + var targetQuery1 = batch.Load(target1.Id); + var streamQuery = batch.Events.FetchForWriting(streamId); + var targetQuery2 = batch.Load(target2.Id); + await batch.Execute(); + + (await targetQuery1).ShouldNotBeNull(); + var stream = await streamQuery; + (await targetQuery2).ShouldNotBeNull(); + + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + var document = stream.Aggregate; + + document.Id.ShouldBe(streamId); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_new_stream_for_writing_string_identifier() { @@ -99,6 +162,33 @@ public async Task fetch_new_stream_for_writing_string_identifier() document.CCount.ShouldBe(2); } + + [Fact] + public async Task fetch_new_stream_for_writing_string_identifier_in_batch() + { + UseStreamIdentity(StreamIdentity.AsString); + + var streamId = Guid.NewGuid().ToString(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldBeNull(); + stream.CurrentVersion.ShouldBe(0); + + stream.AppendOne(new AEvent()); + stream.AppendMany(new BEvent(), new BEvent(), new BEvent()); + stream.AppendMany(new CEvent(), new CEvent()); + + await theSession.SaveChangesAsync(); + + var document = await theSession.Events.AggregateStreamAsync(streamId); + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_string_identifier() { @@ -145,6 +235,32 @@ public async Task fetch_existing_stream_exclusively_happy_path_for_writing_Guid_ document.CCount.ShouldBe(2); } + + [Fact] + public async Task fetch_existing_stream_exclusively_happy_path_for_writing_Guid_identifier_in_batch() + { + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForExclusiveWriting(streamId); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + var document = stream.Aggregate; + + document.Id.ShouldBe(streamId); + + document.ACount.ShouldBe(1); + document.BCount.ShouldBe(3); + document.CCount.ShouldBe(2); + } + [Fact] public async Task fetch_existing_stream_for_writing_Guid_identifier_sad_path() { @@ -227,6 +343,28 @@ public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expecte await theSession.SaveChangesAsync(); } + + [Fact] + public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version_in_batch() + { + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 6); + await batch.Execute(); + var stream = await streamQuery; + + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + stream.AppendOne(new EEvent()); + await theSession.SaveChangesAsync(); + } + [Fact] public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version_immediate_sad_path() { @@ -242,6 +380,25 @@ await Should.ThrowAsync(async () => }); } + + [Fact] + public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version_immediate_sad_path_in_batch() + { + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + await Should.ThrowAsync(async () => + { + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 5); + await batch.Execute(); + var stream = await streamQuery; + }); + } + [Fact] public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version_sad_path_on_save_changes() { @@ -269,6 +426,36 @@ await Should.ThrowAsync(async () => }); } + [Fact] + public async Task fetch_existing_stream_for_writing_Guid_identifier_with_expected_version_sad_path_on_save_changes_in_batch() + { + var streamId = Guid.NewGuid(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + // This should be fine + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 6); + await batch.Execute(); + var stream = await streamQuery; + stream.AppendOne(new EEvent()); + + // Get in between and run other events in a different session + await using (var otherSession = theStore.LightweightSession()) + { + otherSession.Events.Append(streamId, new EEvent()); + await otherSession.SaveChangesAsync(); + } + + // The version is now off + await Should.ThrowAsync(async () => + { + await theSession.SaveChangesAsync(); + }); + } + [Fact] public async Task helpful_exception_when_id_type_is_mismatched_1() @@ -319,6 +506,28 @@ public async Task fetch_existing_stream_for_writing_string_identifier_with_expec await theSession.SaveChangesAsync(); } + [Fact] + public async Task fetch_existing_stream_for_writing_string_identifier_with_expected_version_in_batch() + { + UseStreamIdentity(StreamIdentity.AsString); + + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 6); + await batch.Execute(); + var stream = await streamQuery; + stream.Aggregate.ShouldNotBeNull(); + stream.CurrentVersion.ShouldBe(6); + + stream.AppendOne(new EEvent()); + await theSession.SaveChangesAsync(); + } + [Fact] public async Task fetch_existing_stream_for_writing_string_identifier_with_expected_version_immediate_sad_path() { @@ -335,6 +544,25 @@ await Should.ThrowAsync(async () => }); } + [Fact] + public async Task fetch_existing_stream_for_writing_string_identifier_with_expected_version_immediate_sad_path_in_batch() + { + UseStreamIdentity(StreamIdentity.AsString); + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + await Should.ThrowAsync(async () => + { + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 5); + await batch.Execute(); + var stream = await streamQuery; + }); + } + [Fact] public async Task fetch_existing_stream_for_writing_string_identifier_with_expected_version_sad_path_on_save_changes() { @@ -363,6 +591,37 @@ await Should.ThrowAsync(async () => }); } + [Fact] + public async Task fetch_existing_stream_for_writing_string_identifier_with_expected_version_sad_path_on_save_changes_in_batch() + { + UseStreamIdentity(StreamIdentity.AsString); + var streamId = Guid.NewGuid().ToString(); + + theSession.Events.StartStream(streamId, new AEvent(), new BEvent(), new BEvent(), new BEvent(), + new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + // This should be fine + var batch = theSession.CreateBatchQuery(); + var streamQuery = batch.Events.FetchForWriting(streamId, 6); + await batch.Execute(); + var stream = await streamQuery; + stream.AppendOne(new EEvent()); + + // Get in between and run other events in a different session + await using (var otherSession = theStore.LightweightSession()) + { + otherSession.Events.Append(streamId, new EEvent()); + await otherSession.SaveChangesAsync(); + } + + // The version is now off + await Should.ThrowAsync(async () => + { + await theSession.SaveChangesAsync(); + }); + } + [Fact] public async Task using_a_custom_projection_fetch_latest() { diff --git a/src/EventSourcingTests/StubEventStreamTests.cs b/src/EventSourcingTests/StubEventStreamTests.cs new file mode 100644 index 0000000000..5684e90d5c --- /dev/null +++ b/src/EventSourcingTests/StubEventStreamTests.cs @@ -0,0 +1,26 @@ +using EventSourcingTests.Projections; +using Marten.Events; +using Shouldly; +using Xunit; + +namespace EventSourcingTests; + +public class StubEventStreamTests +{ + [Fact] + public void append_one() + { + var stream = new StubEventStream(new QuestParty()); + stream.AppendOne(new QuestStarted()); + + stream.EventsAppended.Count.ShouldBe(1); + } + + [Fact] + public void append_many() + { + var stream = new StubEventStream(new QuestParty()); + stream.AppendMany(new QuestStarted(), new MonsterDefeated()); + stream.EventsAppended.Count.ShouldBe(2); + } +} diff --git a/src/Marten/Events/EventStore.FetchForWriting.cs b/src/Marten/Events/EventStore.FetchForWriting.cs index 567381f153..2d158c04a6 100644 --- a/src/Marten/Events/EventStore.FetchForWriting.cs +++ b/src/Marten/Events/EventStore.FetchForWriting.cs @@ -7,6 +7,7 @@ using JasperFx.Core; using JasperFx.Core.Reflection; using JasperFx.Events; +using JasperFx.Events.Projections; using Marten.Internal; using Marten.Internal.Sessions; using Marten.Internal.Storage; @@ -59,6 +60,30 @@ IQueryHandler> IEventIdentityStrategy.BuildEventQuer return new ListQueryHandler(statement, selector); } + IQueryHandler> IEventIdentityStrategy.BuildEventQueryHandler(Guid id, ISqlFragment? filter) + { + var selector = _store.Events.EnsureAsGuidStorage(_session); + var statement = new EventStatement(selector) { StreamId = id, TenantId = _tenant.TenantId }; + if (filter != null) + { + statement.Filters = [filter]; + } + + return new ListQueryHandler(statement, selector); + } + + IQueryHandler> IEventIdentityStrategy.BuildEventQueryHandler(string id, ISqlFragment? filter) + { + var selector = _store.Events.EnsureAsStringStorage(_session); + var statement = new EventStatement(selector) { StreamKey = id, TenantId = _tenant.TenantId }; + if (filter != null) + { + statement.Filters = [filter]; + } + + return new ListQueryHandler(statement, selector); + } + async Task IEventIdentityStrategy.EnsureEventStorageExists( DocumentSessionBase session, CancellationToken cancellation) { @@ -100,58 +125,58 @@ IQueryHandler> IEventIdentityStrategy.BuildEventQu public Task> FetchForWriting(Guid id, CancellationToken cancellation = default) where T : class { - var plan = findFetchPlan(); + var plan = FindFetchPlan(); return plan.FetchForWriting(_session, id, false, cancellation); } public Task> FetchForWriting(string key, CancellationToken cancellation = default) where T : class { - var plan = findFetchPlan(); + var plan = FindFetchPlan(); return plan.FetchForWriting(_session, key, false, cancellation); } public Task> FetchForWriting(Guid id, long initialVersion, CancellationToken cancellation = default) where T : class { - var plan = findFetchPlan(); + var plan = FindFetchPlan(); return plan.FetchForWriting(_session, id, initialVersion, cancellation); } public Task> FetchForWriting(string key, long initialVersion, CancellationToken cancellation = default) where T : class { - var plan = findFetchPlan(); + var plan = FindFetchPlan(); return plan.FetchForWriting(_session, key, initialVersion, cancellation); } public Task> FetchForExclusiveWriting(Guid id, CancellationToken cancellation = default) where T : class { - var plan = findFetchPlan(); + var plan = FindFetchPlan(); return plan.FetchForWriting(_session, id, true, cancellation); } public Task> FetchForExclusiveWriting(string key, CancellationToken cancellation = default) where T : class { - var plan = findFetchPlan(); + var plan = FindFetchPlan(); return plan.FetchForWriting(_session, key, true, cancellation); } public ValueTask FetchLatest(Guid id, CancellationToken cancellation = default) where T : class { - var plan = findFetchPlan(); + var plan = FindFetchPlan(); return plan.FetchForReading(_session, id, cancellation); } public ValueTask FetchLatest(string id, CancellationToken cancellation = default) where T : class { - var plan = findFetchPlan(); + var plan = FindFetchPlan(); return plan.FetchForReading(_session, id, cancellation); } - private IAggregateFetchPlan findFetchPlan() where TDoc : class where TId : notnull + internal IAggregateFetchPlan FindFetchPlan() where TDoc : class where TId : notnull { if (typeof(TId) == typeof(Guid)) { @@ -191,6 +216,8 @@ private IAggregateFetchPlan determineFetchPlan(IDocumentSt public interface IAggregateFetchPlan where TDoc : notnull { + ProjectionLifecycle Lifecycle { get; } + Task> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate, CancellationToken cancellation = default); @@ -198,6 +225,14 @@ Task> FetchForWriting(DocumentSessionBase session, TId id, lo CancellationToken cancellation = default); ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation); + + // These two methods are for batching + IQueryHandler> BuildQueryHandler(QuerySession session, TId id, + long expectedStartingVersion); + + IQueryHandler> BuildQueryHandler(QuerySession session, TId id, bool forUpdate); + + IQueryHandler BuildQueryHandler(QuerySession session, TId id); } public interface IEventIdentityStrategy @@ -213,4 +248,7 @@ IEventStream AppendToStream(TDoc? document, DocumentSessionBase sess IQueryHandler> BuildEventQueryHandler(TId id, IEventStorage eventStorage, ISqlFragment? filter = null); + + IQueryHandler> BuildEventQueryHandler(TId id, + ISqlFragment? filter = null); } diff --git a/src/Marten/Events/Fetching/AsyncFetchPlanner.cs b/src/Marten/Events/Fetching/AsyncFetchPlanner.cs new file mode 100644 index 0000000000..0efe3ee97f --- /dev/null +++ b/src/Marten/Events/Fetching/AsyncFetchPlanner.cs @@ -0,0 +1,47 @@ +using System; +using System.Diagnostics.CodeAnalysis; +using JasperFx.Core.Reflection; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten.Events.Projections; +using Marten.Internal.Storage; +using Marten.Schema; + +namespace Marten.Events.Fetching; + +internal class AsyncFetchPlanner: IFetchPlanner +{ + public bool TryMatch(IDocumentStorage storage, IEventIdentityStrategy identity, StoreOptions options, + [NotNullWhen(true)]out IAggregateFetchPlan? plan) where TDoc : class where TId : notnull + { + if (options.Projections.TryFindAggregate(typeof(TDoc), out var projection)) + { + if (projection is MultiStreamProjection) + { + throw new InvalidOperationException( + $"The aggregate type {typeof(TDoc).FullNameInCode()} is the subject of a multi-stream projection and cannot be used with FetchForWriting"); + } + + + + if (projection.Scope == AggregationScope.MultiStream) + { + throw new InvalidOperationException( + $"The aggregate type {typeof(TDoc).FullNameInCode()} is the subject of a multi-stream projection and cannot be used with FetchForWriting"); + } + + if (projection.Lifecycle == ProjectionLifecycle.Async) + { + var mapping = options.Storage.FindMapping(typeof(TDoc)) as DocumentMapping; + if (mapping != null && mapping.Metadata.Revision.Enabled) + { + plan = new FetchAsyncPlan(options.EventGraph, identity, storage); + return true; + } + } + } + + plan = default; + return false; + } +} diff --git a/src/Marten/Events/Fetching/FetchAsyncPlan.ExpectedVersion.cs b/src/Marten/Events/Fetching/FetchAsyncPlan.ExpectedVersion.cs new file mode 100644 index 0000000000..b87eedeb0c --- /dev/null +++ b/src/Marten/Events/Fetching/FetchAsyncPlan.ExpectedVersion.cs @@ -0,0 +1,172 @@ +using System; +using System.Data.Common; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Core; +using JasperFx.Events; +using Marten.Exceptions; +using Marten.Internal; +using Marten.Internal.Sessions; +using Marten.Linq.QueryHandlers; +using Npgsql; +using Weasel.Postgresql; + +namespace Marten.Events.Fetching; + +internal partial class FetchAsyncPlan +{ + public async Task> FetchForWriting(DocumentSessionBase session, TId id, long expectedStartingVersion, + CancellationToken cancellation = default) + { + await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); + await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); + + var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) + .ConfigureAwait(false); + + ensureInitialSql(selector); + // TODO -- use read only transaction???? + + var builder = new BatchBuilder{TenantId = session.TenantId}; + _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, false); + + builder.StartNewCommand(); + + var loadHandler = new LoadByIdHandler(_storage, id); + loadHandler.ConfigureCommand(builder, session); + + builder.StartNewCommand(); + + writeEventFetchStatement(id, builder); + + var batch = builder.Compile(); + await using var reader = + await session.ExecuteReaderAsync(batch, cancellation).ConfigureAwait(false); + + return await ReadIntoStream(session, id, expectedStartingVersion, cancellation, reader, loadHandler, selector).ConfigureAwait(false); + } + + private async Task> ReadIntoStream(DocumentSessionBase session, TId id, long expectedStartingVersion, + CancellationToken cancellation, DbDataReader reader, LoadByIdHandler loadHandler, IEventStorage selector) + { + long version = 0; + try + { + // Read the latest version + if (await reader.ReadAsync(cancellation).ConfigureAwait(false)) + { + version = await reader.GetFieldValueAsync(0, cancellation).ConfigureAwait(false); + } + + if (expectedStartingVersion != version) + { + throw new ConcurrencyException( + $"Expected the existing version to be {expectedStartingVersion}, but was {version}", + typeof(TDoc), id); + } + + // Fetch the existing aggregate -- if any! + await reader.NextResultAsync(cancellation).ConfigureAwait(false); + var document = await loadHandler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); + + // Read in any events from after the current state of the aggregate + await reader.NextResultAsync(cancellation).ConfigureAwait(false); + var events = await new ListQueryHandler(null, selector).HandleAsync(reader, session, cancellation).ConfigureAwait(false); + if (events.Any()) + { + document = await _aggregator.BuildAsync(events, session, document, id, _storage, cancellation).ConfigureAwait(false); + } + + if (document != null) + { + _storage.SetIdentity(document, id); + } + + var stream = version == 0 + ? _identityStrategy.StartStream(document, session, id, cancellation) + : _identityStrategy.AppendToStream(document, session, id, version, cancellation); + + // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest + if (session.Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(id, stream); + } + + return stream; + } + catch (Exception e) + { + if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) + { + throw new StreamLockedException(id, e.InnerException); + } + + if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) + { + throw new StreamLockedException(id, e.InnerException!); + } + + throw; + } + } + + public IQueryHandler> BuildQueryHandler(QuerySession session, TId id, long expectedStartingVersion) + { + var dsb = session.AssertIsDocumentSession(); + if (_initialSql.IsEmpty()) + { + ensureInitialSql(dsb.EventStorage()); + } + return new ExpectedVersionQueryHandler(this, id, expectedStartingVersion); + } + + public class ExpectedVersionQueryHandler: IQueryHandler> + { + private readonly FetchAsyncPlan _parent; + private readonly TId _id; + private readonly long _expectedStartingVersion; + private readonly LoadByIdHandler _loadHandler; + + public ExpectedVersionQueryHandler(FetchAsyncPlan parent, TId id, long expectedStartingVersion) + { + _parent = parent; + _id = id; + _expectedStartingVersion = expectedStartingVersion; + + _loadHandler = new LoadByIdHandler(_parent._storage, id); + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + _parent._identityStrategy.BuildCommandForReadingVersionForStream(builder, _id, false); + + builder.StartNewCommand(); + + _loadHandler.ConfigureCommand(builder, session); + + builder.StartNewCommand(); + + _parent.writeEventFetchStatement(_id, builder); + } + + public Task> HandleAsync(DbDataReader reader, IMartenSession session, CancellationToken token) + { + var documentSessionBase = (DocumentSessionBase)session; + return _parent.ReadIntoStream(documentSessionBase, _id, _expectedStartingVersion, token, reader, + _loadHandler, documentSessionBase.EventStorage()); + } + + public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token) + { + throw new NotSupportedException(); + } + + public IEventStream Handle(DbDataReader reader, IMartenSession session) + { + throw new NotSupportedException(); + } + } +} diff --git a/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs b/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs new file mode 100644 index 0000000000..504a10befe --- /dev/null +++ b/src/Marten/Events/Fetching/FetchAsyncPlan.ForReading.cs @@ -0,0 +1,132 @@ +using System; +using System.Data.Common; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; +using JasperFx.Events; +using Marten.Internal; +using Marten.Internal.Sessions; +using Marten.Linq.QueryHandlers; +using Weasel.Postgresql; + +namespace Marten.Events.Fetching; + +internal partial class FetchAsyncPlan +{ + public async ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation) + { + // Optimization for having called FetchForWriting, then FetchLatest on same session in short order + if (session.Options.Events.UseIdentityMapForAggregates) + { + if (session.TryGetAggregateFromIdentityMap, TId>(id, out var stream)) + { + var starting = stream.Aggregate; + var appendedEvents = stream.Events; + + return await _aggregator.BuildAsync(appendedEvents, session, starting, id, _storage, cancellation).ConfigureAwait(false); + } + } + + await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); + await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); + + var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) + .ConfigureAwait(false); + + _initialSql ??= + $"select {selector.SelectFields().Select(x => "d." + x).Join(", ")} from {_events.DatabaseSchemaName}.mt_events as d"; + + // TODO -- use read only transaction???? + + var builder = new BatchBuilder{TenantId = session.TenantId}; + + var loadHandler = new LoadByIdHandler(_storage, id); + loadHandler.ConfigureCommand(builder, session); + + builder.StartNewCommand(); + + writeEventFetchStatement(id, builder); + + var batch = builder.Compile(); + await using var reader = + await session.ExecuteReaderAsync(batch, cancellation).ConfigureAwait(false); + + return await readLatest(session, id, cancellation, loadHandler, reader, selector).ConfigureAwait(false); + } + + private async Task readLatest(DocumentSessionBase session, TId id, CancellationToken cancellation, + LoadByIdHandler loadHandler, DbDataReader reader, IEventStorage selector) + { + // Fetch the existing aggregate -- if any! + var document = await loadHandler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); + + // Read in any events from after the current state of the aggregate + await reader.NextResultAsync(cancellation).ConfigureAwait(false); + var events = await new ListQueryHandler(null, selector).HandleAsync(reader, session, cancellation).ConfigureAwait(false); + if (events.Any()) + { + document = await _aggregator.BuildAsync(events, session, document, id, _storage, cancellation).ConfigureAwait(false); + } + + if (document != null) + { + _storage.SetIdentity(document, id); + } + + return document; + } + + + public IQueryHandler BuildQueryHandler(QuerySession session, TId id) + { + if (_initialSql.IsEmpty()) + { + ensureInitialSql(session.EventStorage()); + } + + return new QueryHandler(this, id); + } + + public class QueryHandler: IQueryHandler + { + private readonly FetchAsyncPlan _parent; + private readonly TId _id; + private readonly LoadByIdHandler _loadHandler; + + public QueryHandler(FetchAsyncPlan parent, TId id) + { + _parent = parent; + _id = id; + + _loadHandler = new LoadByIdHandler(parent._storage, id); + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + _loadHandler.ConfigureCommand(builder, session); + + builder.StartNewCommand(); + + _parent.writeEventFetchStatement(_id, builder); + } + + public Task HandleAsync(DbDataReader reader, IMartenSession session, CancellationToken token) + { + var documentSessionBase = (DocumentSessionBase)session; + var eventStorage = documentSessionBase.EventStorage(); + return _parent.readLatest(documentSessionBase, _id, token, _loadHandler, reader, eventStorage); + } + + public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token) + { + throw new NotSupportedException(); + } + + public TDoc? Handle(DbDataReader reader, IMartenSession session) + { + throw new NotSupportedException(); + } + } +} diff --git a/src/Marten/Events/Fetching/FetchAsyncPlan.ForUpdate.cs b/src/Marten/Events/Fetching/FetchAsyncPlan.ForUpdate.cs new file mode 100644 index 0000000000..a564447f5f --- /dev/null +++ b/src/Marten/Events/Fetching/FetchAsyncPlan.ForUpdate.cs @@ -0,0 +1,217 @@ +using System; +using System.Data.Common; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core; +using JasperFx.Events; +using Marten.Exceptions; +using Marten.Internal; +using Marten.Internal.Sessions; +using Marten.Linq.QueryHandlers; +using Npgsql; +using Weasel.Postgresql; + +namespace Marten.Events.Fetching; + +internal partial class FetchAsyncPlan +{ + + [MemberNotNull(nameof(_initialSql))] + public async Task> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate, CancellationToken cancellation = default) + { + await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); + await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); + + var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) + .ConfigureAwait(false); + + ensureInitialSql(selector); + + if (forUpdate) + { + await session.BeginTransactionAsync(cancellation).ConfigureAwait(false); + } + + var builder = new BatchBuilder{TenantId = session.TenantId}; + if (!forUpdate) + { + builder.Append("begin transaction isolation level repeatable read read only"); + builder.StartNewCommand(); + } + + _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, forUpdate); + + builder.StartNewCommand(); + + var loadHandler = new LoadByIdHandler(_storage, id); + loadHandler.ConfigureCommand(builder, session); + + builder.StartNewCommand(); + + writeEventFetchStatement(id, builder); + + if (!forUpdate) + { + builder.StartNewCommand(); + builder.Append("end"); + } + + var batch = builder.Compile(); + try + { + await using var reader = + await session.ExecuteReaderAsync(batch, cancellation).ConfigureAwait(false); + + return await ReadIntoStream(session, id, cancellation, reader, loadHandler, selector).ConfigureAwait(false); + } + catch (Exception e) + { + if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) + { + throw new StreamLockedException(id, e.InnerException); + } + + if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) + { + throw new StreamLockedException(id, e.InnerException); + } + + throw; + } + } + + private void ensureInitialSql(IEventStorage selector) + { + _initialSql ??= + $"select {selector.SelectFields().Select(x => "d." + x).Join(", ")} from {_events.DatabaseSchemaName}.mt_events as d"; + } + + private async Task> ReadIntoStream(DocumentSessionBase session, TId id, CancellationToken cancellation, + DbDataReader reader, LoadByIdHandler loadHandler, IEventStorage selector) + { + long version = 0; + try + { + // Read the latest version + if (await reader.ReadAsync(cancellation).ConfigureAwait(false)) + { + version = await reader.GetFieldValueAsync(0, cancellation).ConfigureAwait(false); + } + + // Fetch the existing aggregate -- if any! + await reader.NextResultAsync(cancellation).ConfigureAwait(false); + var document = await loadHandler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); + + // Read in any events from after the current state of the aggregate + await reader.NextResultAsync(cancellation).ConfigureAwait(false); + var events = await new ListQueryHandler(null, selector).HandleAsync(reader, session, cancellation).ConfigureAwait(false); + if (events.Any()) + { + document = await _aggregator.BuildAsync(events, session, document, id, _storage, cancellation).ConfigureAwait(false); + } + + if (document != null) + { + _storage.SetIdentity(document, id); + } + + var stream = version == 0 + ? _identityStrategy.StartStream(document, session, id, cancellation) + : _identityStrategy.AppendToStream(document, session, id, version, cancellation); + + // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest + if (session.Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(id, stream); + } + + return stream; + } + catch (Exception e) + { + if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) + { + throw new StreamLockedException(id, e.InnerException); + } + + if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) + { + throw new StreamLockedException(id, e.InnerException!); + } + + throw; + } + } + + public IQueryHandler> BuildQueryHandler(QuerySession session, TId id, bool forUpdate) + { + var dsb = session.AssertIsDocumentSession(); + if (_initialSql.IsEmpty()) + { + ensureInitialSql(dsb.EventStorage()); + } + + return new ForUpdateQueryHandler(this, id, forUpdate); + } + + public class ForUpdateQueryHandler: IQueryHandler> + { + private readonly FetchAsyncPlan _parent; + private readonly TId _id; + private readonly bool _forUpdate; + private readonly LoadByIdHandler _loadHandler; + + public ForUpdateQueryHandler(FetchAsyncPlan parent, TId id, bool forUpdate) + { + _parent = parent; + _id = id; + _forUpdate = forUpdate; + _loadHandler = new LoadByIdHandler(parent._storage, id); + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + if (!_forUpdate) + { + builder.Append("begin transaction isolation level repeatable read read only"); + builder.StartNewCommand(); + } + + _parent._identityStrategy.BuildCommandForReadingVersionForStream(builder, _id, _forUpdate); + + builder.StartNewCommand(); + + _loadHandler.ConfigureCommand(builder, session); + + builder.StartNewCommand(); + + _parent.writeEventFetchStatement(_id, builder); + + if (!_forUpdate) + { + builder.StartNewCommand(); + builder.Append("end"); + } + } + + public Task> HandleAsync(DbDataReader reader, IMartenSession session, CancellationToken token) + { + var documentSessionBase = (DocumentSessionBase)session; + return _parent.ReadIntoStream(documentSessionBase, _id, token, reader, _loadHandler, documentSessionBase.EventStorage()); + } + + public IEventStream Handle(DbDataReader reader, IMartenSession session) + { + throw new NotSupportedException(); + } + + public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token) + { + throw new NotSupportedException(); + } + } + +} diff --git a/src/Marten/Events/Fetching/FetchAsyncPlan.cs b/src/Marten/Events/Fetching/FetchAsyncPlan.cs index 00c5323df0..2f88643158 100644 --- a/src/Marten/Events/Fetching/FetchAsyncPlan.cs +++ b/src/Marten/Events/Fetching/FetchAsyncPlan.cs @@ -1,72 +1,18 @@ -using System; -using System.Diagnostics.CodeAnalysis; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using JasperFx; -using JasperFx.Core; using JasperFx.Core.Reflection; -using JasperFx.Events; using JasperFx.Events.Aggregation; -using JasperFx.Events.Daemon; using JasperFx.Events.Projections; -using Marten.Events.Projections; -using Marten.Exceptions; -using Marten.Internal.Sessions; using Marten.Internal.Storage; -using Marten.Linq.QueryHandlers; -using Marten.Schema; using Marten.Storage; -using Npgsql; -using Weasel.Core; using Weasel.Postgresql; -using Weasel.Postgresql.SqlGeneration; namespace Marten.Events.Fetching; -internal class AsyncFetchPlanner: IFetchPlanner -{ - public bool TryMatch(IDocumentStorage storage, IEventIdentityStrategy identity, StoreOptions options, - [NotNullWhen(true)]out IAggregateFetchPlan? plan) where TDoc : class where TId : notnull - { - if (options.Projections.TryFindAggregate(typeof(TDoc), out var projection)) - { - if (projection is MultiStreamProjection) - { - throw new InvalidOperationException( - $"The aggregate type {typeof(TDoc).FullNameInCode()} is the subject of a multi-stream projection and cannot be used with FetchForWriting"); - } - - - - if (projection.Scope == AggregationScope.MultiStream) - { - throw new InvalidOperationException( - $"The aggregate type {typeof(TDoc).FullNameInCode()} is the subject of a multi-stream projection and cannot be used with FetchForWriting"); - } - - if (projection.Lifecycle == ProjectionLifecycle.Async) - { - var mapping = options.Storage.FindMapping(typeof(TDoc)) as DocumentMapping; - if (mapping != null && mapping.Metadata.Revision.Enabled) - { - plan = new FetchAsyncPlan(options.EventGraph, identity, storage); - return true; - } - } - } - - plan = default; - return false; - } -} - -internal class FetchAsyncPlan: IAggregateFetchPlan where TDoc : class where TId : notnull +internal partial class FetchAsyncPlan: IAggregateFetchPlan where TDoc : class where TId : notnull { + private readonly IAggregator _aggregator; private readonly EventGraph _events; private readonly IEventIdentityStrategy _identityStrategy; private readonly IDocumentStorage _storage; - private readonly IAggregator _aggregator; private readonly string _versionSelectionSql; private string? _initialSql; @@ -80,7 +26,9 @@ public FetchAsyncPlan(EventGraph events, IEventIdentityStrategy identityStr // Blame strong typed identifiers for this abomination folks _aggregator = raw as IAggregator - ?? typeof(IdentityForwardingAggregator<,,,>).CloseAndBuildAs>(raw, _storage, typeof(TDoc), _storage.IdType, typeof(TId), typeof(IQuerySession)); + ?? typeof(IdentityForwardingAggregator<,,,>) + .CloseAndBuildAs>(raw, _storage, typeof(TDoc), + _storage.IdType, typeof(TId), typeof(IQuerySession)); if (_events.TenancyStyle == TenancyStyle.Single) { @@ -92,110 +40,12 @@ public FetchAsyncPlan(EventGraph events, IEventIdentityStrategy identityStr _versionSelectionSql = $" left outer join {storage.TableName.QualifiedName} as a on d.stream_id = a.id and d.tenant_id = a.tenant_id where (a.mt_version is NULL or d.version > a.mt_version) and d.stream_id = "; } - - } - [MemberNotNull(nameof(_initialSql))] - public async Task> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate, CancellationToken cancellation = default) - { - await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); - await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); - - var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) - .ConfigureAwait(false); - - _initialSql ??= - $"select {selector.SelectFields().Select(x => "d." + x).Join(", ")} from {_events.DatabaseSchemaName}.mt_events as d"; - - if (forUpdate) - { - await session.BeginTransactionAsync(cancellation).ConfigureAwait(false); - } - - var builder = new BatchBuilder{TenantId = session.TenantId}; - if (!forUpdate) - { - builder.Append("begin transaction isolation level repeatable read read only"); - builder.StartNewCommand(); - } - _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, forUpdate); - - builder.StartNewCommand(); - - var loadHandler = new LoadByIdHandler(_storage, id); - loadHandler.ConfigureCommand(builder, session); - - builder.StartNewCommand(); - - writeEventFetchStatement(id, builder); - - if (!forUpdate) - { - builder.StartNewCommand(); - builder.Append("end"); - } - - long version = 0; - try - { - var batch = builder.Compile(); - await using var reader = - await session.ExecuteReaderAsync(batch, cancellation).ConfigureAwait(false); - - // Read the latest version - if (await reader.ReadAsync(cancellation).ConfigureAwait(false)) - { - version = await reader.GetFieldValueAsync(0, cancellation).ConfigureAwait(false); - } - - // Fetch the existing aggregate -- if any! - await reader.NextResultAsync(cancellation).ConfigureAwait(false); - var document = await loadHandler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); - - // Read in any events from after the current state of the aggregate - await reader.NextResultAsync(cancellation).ConfigureAwait(false); - var events = await new ListQueryHandler(null, selector).HandleAsync(reader, session, cancellation).ConfigureAwait(false); - if (events.Any()) - { - document = await _aggregator.BuildAsync(events, session, document, id, _storage, cancellation).ConfigureAwait(false); - } - - if (document != null) - { - _storage.SetIdentity(document, id); - } - - var stream = version == 0 - ? _identityStrategy.StartStream(document, session, id, cancellation) - : _identityStrategy.AppendToStream(document, session, id, version, cancellation); - - // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest - if (session.Options.Events.UseIdentityMapForAggregates) - { - session.StoreDocumentInItemMap(id, stream); - } - - return stream; - } - catch (Exception e) - { - if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) - { - throw new StreamLockedException(id, e.InnerException); - } - - if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) - { - throw new StreamLockedException(id, e.InnerException!); - } - - throw; - } - } + public ProjectionLifecycle Lifecycle => ProjectionLifecycle.Async; private void writeEventFetchStatement(TId id, - BatchBuilder builder) + ICommandBuilder builder) { builder.Append(_initialSql!); builder.Append(_versionSelectionSql); @@ -209,173 +59,4 @@ private void writeEventFetchStatement(TId id, builder.AppendParameter(builder.TenantId); } } - - public async Task> FetchForWriting(DocumentSessionBase session, TId id, long expectedStartingVersion, - CancellationToken cancellation = default) - { - await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); - await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); - - var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) - .ConfigureAwait(false); - - _initialSql ??= - $"select {selector.SelectFields().Select(x => "d." + x).Join(", ")} from {_events.DatabaseSchemaName}.mt_events as d"; - - // TODO -- use read only transaction???? - - var builder = new BatchBuilder{TenantId = session.TenantId}; - _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, false); - - builder.StartNewCommand(); - - var loadHandler = new LoadByIdHandler(_storage, id); - loadHandler.ConfigureCommand(builder, session); - - builder.StartNewCommand(); - - writeEventFetchStatement(id, builder); - - long version = 0; - try - { - var batch = builder.Compile(); - await using var reader = - await session.ExecuteReaderAsync(batch, cancellation).ConfigureAwait(false); - - // Read the latest version - if (await reader.ReadAsync(cancellation).ConfigureAwait(false)) - { - version = await reader.GetFieldValueAsync(0, cancellation).ConfigureAwait(false); - } - - if (expectedStartingVersion != version) - { - throw new ConcurrencyException( - $"Expected the existing version to be {expectedStartingVersion}, but was {version}", - typeof(TDoc), id); - } - - // Fetch the existing aggregate -- if any! - await reader.NextResultAsync(cancellation).ConfigureAwait(false); - var document = await loadHandler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); - - // Read in any events from after the current state of the aggregate - await reader.NextResultAsync(cancellation).ConfigureAwait(false); - var events = await new ListQueryHandler(null, selector).HandleAsync(reader, session, cancellation).ConfigureAwait(false); - if (events.Any()) - { - document = await _aggregator.BuildAsync(events, session, document, id, _storage, cancellation).ConfigureAwait(false); - } - - if (document != null) - { - _storage.SetIdentity(document, id); - } - - var stream = version == 0 - ? _identityStrategy.StartStream(document, session, id, cancellation) - : _identityStrategy.AppendToStream(document, session, id, version, cancellation); - - // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest - if (session.Options.Events.UseIdentityMapForAggregates) - { - session.StoreDocumentInItemMap(id, stream); - } - - return stream; - } - catch (Exception e) - { - if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) - { - throw new StreamLockedException(id, e.InnerException); - } - - if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) - { - throw new StreamLockedException(id, e.InnerException!); - } - - throw; - } - } - - public async ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation) - { - // Optimization for having called FetchForWriting, then FetchLatest on same session in short order - if (session.Options.Events.UseIdentityMapForAggregates) - { - if (session.TryGetAggregateFromIdentityMap, TId>(id, out var stream)) - { - var starting = stream.Aggregate; - var appendedEvents = stream.Events; - - return await _aggregator.BuildAsync(appendedEvents, session, starting, id, _storage, cancellation).ConfigureAwait(false); - } - } - - await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); - await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); - - var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) - .ConfigureAwait(false); - - _initialSql ??= - $"select {selector.SelectFields().Select(x => "d." + x).Join(", ")} from {_events.DatabaseSchemaName}.mt_events as d"; - - // TODO -- use read only transaction???? - - var builder = new BatchBuilder{TenantId = session.TenantId}; - - var loadHandler = new LoadByIdHandler(_storage, id); - loadHandler.ConfigureCommand(builder, session); - - builder.StartNewCommand(); - - writeEventFetchStatement(id, builder); - - var batch = builder.Compile(); - await using var reader = - await session.ExecuteReaderAsync(batch, cancellation).ConfigureAwait(false); - - // Fetch the existing aggregate -- if any! - var document = await loadHandler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); - - // Read in any events from after the current state of the aggregate - await reader.NextResultAsync(cancellation).ConfigureAwait(false); - var events = await new ListQueryHandler(null, selector).HandleAsync(reader, session, cancellation).ConfigureAwait(false); - if (events.Any()) - { - document = await _aggregator.BuildAsync(events, session, document, id, _storage, cancellation).ConfigureAwait(false); - } - - if (document != null) - { - _storage.SetIdentity(document, id); - } - - return document; - } -} - -internal class AggregateEventFloor: ISqlFragment -{ - private readonly DbObjectName _tableName; - private readonly TId _id; - - public AggregateEventFloor(DbObjectName tableName, TId id) - { - _tableName = tableName; - _id = id; - } - - public void Apply(ICommandBuilder builder) - { - builder.Append("version > (select mt_version from "); - builder.Append(_tableName.QualifiedName); - builder.Append(" as a where a.id = "); - builder.AppendParameter(_id); - builder.Append(")"); - } } diff --git a/src/Marten/Events/Fetching/FetchForWritingExtensions.cs b/src/Marten/Events/Fetching/FetchForWritingExtensions.cs new file mode 100644 index 0000000000..5d6ea64ac4 --- /dev/null +++ b/src/Marten/Events/Fetching/FetchForWritingExtensions.cs @@ -0,0 +1,18 @@ +using System; +using Marten.Internal.Sessions; + +namespace Marten.Events.Fetching; + +internal static class FetchForWritingExtensions +{ + public static DocumentSessionBase? AssertIsDocumentSession(this QuerySession session) + { + if (session is not DocumentSessionBase) + { + throw new InvalidOperationException( + "Using FetchForWriting() is only possible for full, writeable IDocumentSession"); + } + + return (DocumentSessionBase?)session; + } +} diff --git a/src/Marten/Events/Fetching/FetchInlinedPlan.ExpectedVersion.cs b/src/Marten/Events/Fetching/FetchInlinedPlan.ExpectedVersion.cs new file mode 100644 index 0000000000..63b6b3f4c2 --- /dev/null +++ b/src/Marten/Events/Fetching/FetchInlinedPlan.ExpectedVersion.cs @@ -0,0 +1,140 @@ +using System; +using System.Data.Common; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using Marten.Exceptions; +using Marten.Internal; +using Marten.Internal.Sessions; +using Marten.Linq.QueryHandlers; +using Npgsql; +using Weasel.Postgresql; + +namespace Marten.Events.Fetching; + +internal partial class FetchInlinedPlan +{ + public async Task> FetchForWriting(DocumentSessionBase session, TId id, + long expectedStartingVersion, CancellationToken cancellation = default) + { + var storage = findDocumentStorage(session); + + await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); + await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); + + var builder = new BatchBuilder { TenantId = session.TenantId }; + _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, false); + builder.Append(";"); + + builder.StartNewCommand(); + + var handler = new LoadByIdHandler(storage, id); + handler.ConfigureCommand(builder, session); + + await using var reader = + await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); + + return await ReadIntoStreamWithExpectedVersion(session, id, expectedStartingVersion, cancellation, reader, handler).ConfigureAwait(false); + } + + + private async Task> ReadIntoStreamWithExpectedVersion(DocumentSessionBase session, TId id, long expectedStartingVersion, + CancellationToken cancellation, DbDataReader reader, LoadByIdHandler handler) + { + long version = 0; + try + { + if (await reader.ReadAsync(cancellation).ConfigureAwait(false)) + { + version = await reader.GetFieldValueAsync(0, cancellation).ConfigureAwait(false); + } + + if (expectedStartingVersion != version) + { + throw new ConcurrencyException( + $"Expected the existing version to be {expectedStartingVersion}, but was {version}", + typeof(TDoc), id); + } + + await reader.NextResultAsync(cancellation).ConfigureAwait(false); + var document = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); + + // As an optimization, put the document in the identity map for later + if (document != null && session.Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(id, document); + } + + return version == 0 + ? _identityStrategy.StartStream(document, session, id, cancellation) + : _identityStrategy.AppendToStream(document, session, id, version, cancellation); + } + catch (Exception e) + { + if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) + { + throw new StreamLockedException(id, e.InnerException); + } + + if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) + { + throw new StreamLockedException(id, e.InnerException); + } + + throw; + } + } + + + public IQueryHandler> BuildQueryHandler(QuerySession session, TId id, long expectedStartingVersion) + { + session.AssertIsDocumentSession(); + var storage = findDocumentStorage(session); + var handler = new LoadByIdHandler(storage, id); + return new WithStartingVersionHandler(this, id, handler, expectedStartingVersion); + } + + internal class WithStartingVersionHandler: IQueryHandler> + { + private readonly FetchInlinedPlan _parent; + private readonly TId _id; + private readonly LoadByIdHandler _handler; + private readonly long _version; + + public WithStartingVersionHandler(FetchInlinedPlan parent, TId id, LoadByIdHandler handler, long version) + { + _parent = parent; + _id = id; + _handler = handler; + _version = version; + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + _parent._identityStrategy.BuildCommandForReadingVersionForStream(builder, _id, false); + builder.StartNewCommand(); + _handler.ConfigureCommand(builder, session); + } + + #region things we don't care about + + public IEventStream Handle(DbDataReader reader, IMartenSession session) + { + throw new NotImplementedException(); + } + + public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token) + { + throw new NotImplementedException(); + } + + #endregion + + public Task> HandleAsync(DbDataReader reader, IMartenSession session, CancellationToken token) + { + return _parent.ReadIntoStreamWithExpectedVersion((DocumentSessionBase)session, _id, _version, token, reader, _handler); + } + } + +} diff --git a/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs b/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs new file mode 100644 index 0000000000..a88d92220e --- /dev/null +++ b/src/Marten/Events/Fetching/FetchInlinedPlan.ForReading.cs @@ -0,0 +1,48 @@ +using System.Threading; +using System.Threading.Tasks; +using Marten.Internal.Sessions; +using Marten.Linq.QueryHandlers; +using Weasel.Postgresql; + +namespace Marten.Events.Fetching; + +internal partial class FetchInlinedPlan +{ + public async ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation) + { + var storage = findDocumentStorage(session); + + // Opting into optimizations here + if (session.TryGetAggregateFromIdentityMap(id, out var doc)) + { + return doc; + } + + await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); + + var builder = new BatchBuilder { TenantId = session.TenantId }; + builder.Append(";"); + + var handler = new LoadByIdHandler(storage, id); + handler.ConfigureCommand(builder, session); + + await using var reader = + await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); + var document = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); + + return document; + } + + public IQueryHandler BuildQueryHandler(QuerySession session, TId id) + { + var storage = findDocumentStorage(session); + + // Opting into optimizations here + if (session is DocumentSessionBase dsb && dsb.TryGetAggregateFromIdentityMap(id, out var doc)) + { + return new PreCannedQueryHandler(doc); + } + + return new LoadByIdHandler(storage, id); + } +} diff --git a/src/Marten/Events/Fetching/FetchInlinedPlan.ForUpdate.cs b/src/Marten/Events/Fetching/FetchInlinedPlan.ForUpdate.cs new file mode 100644 index 0000000000..5264c3262c --- /dev/null +++ b/src/Marten/Events/Fetching/FetchInlinedPlan.ForUpdate.cs @@ -0,0 +1,127 @@ +using System; +using System.Data.Common; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Marten.Exceptions; +using Marten.Internal; +using Marten.Internal.Sessions; +using Marten.Internal.Storage; +using Marten.Linq.QueryHandlers; +using Npgsql; +using Weasel.Postgresql; + +namespace Marten.Events.Fetching; + +internal partial class FetchInlinedPlan +{ + public async Task> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate, + CancellationToken cancellation = default) + { + IDocumentStorage? storage = null; + if (session.Options.Events.UseIdentityMapForAggregates) + { + storage = session.Options.ResolveCorrectedDocumentStorage(DocumentTracking.IdentityOnly); + // Opt into the identity map mechanics for this aggregate type just in case + // you're using a lightweight session + session.UseIdentityMapFor(); + } + else + { + storage = session.Options.ResolveCorrectedDocumentStorage(session.TrackingMode); + } + + await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); + await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); + + if (forUpdate) + { + await session.BeginTransactionAsync(cancellation).ConfigureAwait(false); + } + + var builder = new BatchBuilder{TenantId = session.TenantId}; + _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, forUpdate); + + builder.StartNewCommand(); + + var handler = new LoadByIdHandler(storage, id); + handler.ConfigureCommand(builder, session); + + try + { + await using var reader = + await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); + + return await ReadIntoStream(session, id, cancellation, reader, handler).ConfigureAwait(false); + } + catch (Exception e) + { + if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) + { + throw new StreamLockedException(id, e.InnerException); + } + + if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) + { + throw new StreamLockedException(id, e.InnerException); + } + + throw; + } + } + + public IQueryHandler> BuildQueryHandler(QuerySession session, TId id, bool forUpdate) + { + session.AssertIsDocumentSession(); + var storage = findDocumentStorage(session); + + var handler = new LoadByIdHandler(storage, id); + return new QueryHandler(this, id, handler, forUpdate); + } + + internal class QueryHandler: IQueryHandler> + { + private readonly FetchInlinedPlan _parent; + private readonly TId _id; + private readonly LoadByIdHandler _handler; + private readonly bool _forUpdate; + + public QueryHandler(FetchInlinedPlan parent, TId id, LoadByIdHandler handler, + bool forUpdate) + { + _parent = parent; + _id = id; + _handler = handler; + _forUpdate = forUpdate; + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + _parent._identityStrategy.BuildCommandForReadingVersionForStream(builder, _id, _forUpdate); + + builder.StartNewCommand(); + + _handler.ConfigureCommand(builder, session); + } + + public Task> HandleAsync(DbDataReader reader, IMartenSession session, CancellationToken token) + { + return _parent.ReadIntoStream((DocumentSessionBase)session, _id, token, reader, _handler); + } + + #region stuff we don't care about + + public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token) + { + throw new NotImplementedException(); + } + + + public IEventStream Handle(DbDataReader reader, IMartenSession session) + { + throw new NotImplementedException(); + } + + #endregion + } +} diff --git a/src/Marten/Events/Fetching/FetchInlinedPlan.cs b/src/Marten/Events/Fetching/FetchInlinedPlan.cs index a8baa22ecf..fd71561582 100644 --- a/src/Marten/Events/Fetching/FetchInlinedPlan.cs +++ b/src/Marten/Events/Fetching/FetchInlinedPlan.cs @@ -1,10 +1,14 @@ using System; using System.Collections.Generic; +using System.Data.Common; +using System.IO; using System.Threading; using System.Threading.Tasks; using JasperFx; using JasperFx.Core.Reflection; +using JasperFx.Events.Projections; using Marten.Exceptions; +using Marten.Internal; using Marten.Internal.Sessions; using Marten.Internal.Storage; using Marten.Linq.QueryHandlers; @@ -13,7 +17,7 @@ namespace Marten.Events.Fetching; -internal class FetchInlinedPlan: IAggregateFetchPlan where TDoc : class where TId : notnull +internal partial class FetchInlinedPlan: IAggregateFetchPlan where TDoc : class where TId : notnull { private readonly EventGraph _events; private readonly IEventIdentityStrategy _identityStrategy; @@ -24,43 +28,14 @@ internal FetchInlinedPlan(EventGraph events, IEventIdentityStrategy identit _identityStrategy = identityStrategy; } - public async Task> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate, - CancellationToken cancellation = default) - { - IDocumentStorage? storage = null; - if (session.Options.Events.UseIdentityMapForAggregates) - { - storage = session.Options.ResolveCorrectedDocumentStorage(DocumentTracking.IdentityOnly); - // Opt into the identity map mechanics for this aggregate type just in case - // you're using a lightweight session - session.UseIdentityMapFor(); - } - else - { - storage = session.Options.ResolveCorrectedDocumentStorage(session.TrackingMode); - } - - await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); - await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); - - if (forUpdate) - { - await session.BeginTransactionAsync(cancellation).ConfigureAwait(false); - } - - var builder = new BatchBuilder{TenantId = session.TenantId}; - _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, forUpdate); - - builder.StartNewCommand(); - - var handler = new LoadByIdHandler(storage, id); - handler.ConfigureCommand(builder, session); + public ProjectionLifecycle Lifecycle => ProjectionLifecycle.Inline; + private async Task> ReadIntoStream(DocumentSessionBase session, TId id, CancellationToken cancellation, + DbDataReader reader, LoadByIdHandler handler) + { long version = 0; try { - await using var reader = - await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); if (await reader.ReadAsync(cancellation).ConfigureAwait(false)) { version = await reader.GetFieldValueAsync(0, cancellation).ConfigureAwait(false); @@ -95,8 +70,7 @@ public async Task> FetchForWriting(DocumentSessionBase sessio } } - public async Task> FetchForWriting(DocumentSessionBase session, TId id, - long expectedStartingVersion, CancellationToken cancellation = default) + private static IDocumentStorage findDocumentStorage(QuerySession session) { IDocumentStorage? storage = null; if (session.Options.Events.UseIdentityMapForAggregates) @@ -111,97 +85,7 @@ public async Task> FetchForWriting(DocumentSessionBase sessio storage = session.StorageFor(); } - await _identityStrategy.EnsureEventStorageExists(session, cancellation).ConfigureAwait(false); - await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); - - var builder = new BatchBuilder { TenantId = session.TenantId }; - _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, false); - builder.Append(";"); - - builder.StartNewCommand(); - - var handler = new LoadByIdHandler(storage, id); - handler.ConfigureCommand(builder, session); - - long version = 0; - try - { - await using var reader = - await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); - if (await reader.ReadAsync(cancellation).ConfigureAwait(false)) - { - version = await reader.GetFieldValueAsync(0, cancellation).ConfigureAwait(false); - } - - if (expectedStartingVersion != version) - { - throw new ConcurrencyException( - $"Expected the existing version to be {expectedStartingVersion}, but was {version}", - typeof(TDoc), id); - } - - await reader.NextResultAsync(cancellation).ConfigureAwait(false); - var document = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); - - // As an optimization, put the document in the identity map for later - if (document != null && session.Options.Events.UseIdentityMapForAggregates) - { - session.StoreDocumentInItemMap(id, document); - } - - return version == 0 - ? _identityStrategy.StartStream(document, session, id, cancellation) - : _identityStrategy.AppendToStream(document, session, id, version, cancellation); - } - catch (Exception e) - { - if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) - { - throw new StreamLockedException(id, e.InnerException); - } - - if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) - { - throw new StreamLockedException(id, e.InnerException); - } - - throw; - } + return storage; } - public async ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation) - { - IDocumentStorage? storage = null; - if (session.Options.Events.UseIdentityMapForAggregates) - { - storage = (IDocumentStorage)session.Options.Providers.StorageFor().IdentityMap; - // Opt into the identity map mechanics for this aggregate type just in case - // you're using a lightweight session - session.UseIdentityMapFor(); - } - else - { - storage = session.StorageFor(); - } - - // Opting into optimizations here - if (session.TryGetAggregateFromIdentityMap(id, out var doc)) - { - return doc; - } - - await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false); - - var builder = new BatchBuilder { TenantId = session.TenantId }; - builder.Append(";"); - - var handler = new LoadByIdHandler(storage, id); - handler.ConfigureCommand(builder, session); - - await using var reader = - await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); - var document = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); - - return document; - } } diff --git a/src/Marten/Events/Fetching/FetchLivePlan.ExpectedVersion.cs b/src/Marten/Events/Fetching/FetchLivePlan.ExpectedVersion.cs new file mode 100644 index 0000000000..932fe5c4ba --- /dev/null +++ b/src/Marten/Events/Fetching/FetchLivePlan.ExpectedVersion.cs @@ -0,0 +1,138 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using JasperFx; +using JasperFx.Events; +using Marten.Exceptions; +using Marten.Internal; +using Marten.Internal.Sessions; +using Marten.Linq.QueryHandlers; +using Npgsql; +using Weasel.Postgresql; + +namespace Marten.Events.Fetching; + +internal partial class FetchLivePlan +{ + public async Task> FetchForWriting(DocumentSessionBase session, TId id, + long expectedStartingVersion, + CancellationToken cancellation = default) + { + + var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) + .ConfigureAwait(false); + + var builder = new BatchBuilder{TenantId = session.TenantId}; + _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, false); + + builder.StartNewCommand(); + + var handler = _identityStrategy.BuildEventQueryHandler(id, selector); + handler.ConfigureCommand(builder, session); + + await using var reader = + await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); + + return await ReadIntoStream(session, id, expectedStartingVersion, cancellation, reader, handler).ConfigureAwait(false); + } + + private async Task> ReadIntoStream(DocumentSessionBase session, TId id, long expectedStartingVersion, + CancellationToken cancellation, DbDataReader reader, IQueryHandler> handler) + { + long version = 0; + try + { + if (await reader.ReadAsync(cancellation).ConfigureAwait(false)) + { + version = await reader.GetFieldValueAsync(0, cancellation).ConfigureAwait(false); + } + + if (expectedStartingVersion != version) + { + throw new ConcurrencyException( + $"Expected the existing version to be {expectedStartingVersion}, but was {version}", + typeof(TDoc), id); + } + + await reader.NextResultAsync(cancellation).ConfigureAwait(false); + var events = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); + var document = await _aggregator.BuildAsync(events, session, default, id, _documentStorage, cancellation).ConfigureAwait(false); + + var stream = version == 0 + ? _identityStrategy.StartStream(document, session, id, cancellation) + : _identityStrategy.AppendToStream(document, session, id, version, cancellation); + + // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest + if (session.Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(id, stream); + } + + return stream; + } + catch (Exception e) + { + if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) + { + throw new StreamLockedException(id, e.InnerException); + } + + if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) + { + throw new StreamLockedException(id, e.InnerException); + } + + throw; + } + } + + public IQueryHandler> BuildQueryHandler(QuerySession session, TId id, long expectedStartingVersion) + { + session.AssertIsDocumentSession(); + return new ExpectedVersionQueryHandler(this, id, expectedStartingVersion); + } + + public class ExpectedVersionQueryHandler: IQueryHandler> + { + private readonly FetchLivePlan _parent; + private readonly TId _id; + private readonly long _expectedStartingVersion; + private readonly IQueryHandler> _handler; + + public ExpectedVersionQueryHandler(FetchLivePlan parent, TId id, long expectedStartingVersion) + { + _parent = parent; + _id = id; + _expectedStartingVersion = expectedStartingVersion; + _handler = _parent._identityStrategy.BuildEventQueryHandler(_id); + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + _parent._identityStrategy.BuildCommandForReadingVersionForStream(builder, _id, false); + + builder.StartNewCommand(); + + _handler.ConfigureCommand(builder, session); + } + + public Task> HandleAsync(DbDataReader reader, IMartenSession session, CancellationToken token) + { + return _parent.ReadIntoStream((DocumentSessionBase)session, _id, _expectedStartingVersion, token, reader, _handler); + } + + public IEventStream Handle(DbDataReader reader, IMartenSession session) + { + throw new NotSupportedException(); + } + + public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token) + { + throw new NotSupportedException(); + } + } + +} diff --git a/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs b/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs new file mode 100644 index 0000000000..e5849d84a8 --- /dev/null +++ b/src/Marten/Events/Fetching/FetchLivePlan.ForReading.cs @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Events; +using Marten.Internal; +using Marten.Internal.Sessions; +using Marten.Linq.QueryHandlers; +using Weasel.Postgresql; + +namespace Marten.Events.Fetching; + +internal partial class FetchLivePlan +{ + public async ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation) + { + // Optimization for having called FetchForWriting, then FetchLatest on same session in short order + if (session.Options.Events.UseIdentityMapForAggregates) + { + if (session.TryGetAggregateFromIdentityMap, TId>(id, out var stream)) + { + var starting = stream.Aggregate; + var appendedEvents = stream.Events; + + return await _aggregator.BuildAsync(appendedEvents, session, starting, id, _documentStorage, cancellation).ConfigureAwait(false); + } + } + + var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) + .ConfigureAwait(false); + + var builder = new BatchBuilder{TenantId = session.TenantId}; + + var handler = _identityStrategy.BuildEventQueryHandler(id, selector); + handler.ConfigureCommand(builder, session); + + await using var reader = + await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); + + var events = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); + return await _aggregator.BuildAsync(events, session, default, id, _documentStorage, cancellation).ConfigureAwait(false); + } + + public IQueryHandler BuildQueryHandler(QuerySession session, TId id) + { + return new ReadOnlyQueryHandler(this, id); + } + + public class ReadOnlyQueryHandler: IQueryHandler + { + private readonly FetchLivePlan _parent; + private readonly TId _id; + private readonly IQueryHandler> _handler; + + public ReadOnlyQueryHandler(FetchLivePlan parent, TId id) + { + _parent = parent; + _id = id; + + _handler = parent._identityStrategy.BuildEventQueryHandler(id); + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + _handler.ConfigureCommand(builder, session); + } + + public TDoc? Handle(DbDataReader reader, IMartenSession session) + { + throw new NotSupportedException(); + } + + public async Task HandleAsync(DbDataReader reader, IMartenSession session, CancellationToken token) + { + var events = await _handler.HandleAsync(reader, session, token).ConfigureAwait(false); + return await _parent._aggregator.BuildAsync(events, (QuerySession)session, default, _id, _parent._documentStorage, token).ConfigureAwait(false); + } + + public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token) + { + throw new NotSupportedException(); + } + } +} diff --git a/src/Marten/Events/Fetching/FetchLivePlan.ForUpdate.cs b/src/Marten/Events/Fetching/FetchLivePlan.ForUpdate.cs new file mode 100644 index 0000000000..e6dfa309e5 --- /dev/null +++ b/src/Marten/Events/Fetching/FetchLivePlan.ForUpdate.cs @@ -0,0 +1,155 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Events; +using Marten.Exceptions; +using Marten.Internal; +using Marten.Internal.Sessions; +using Marten.Linq.QueryHandlers; +using Npgsql; +using Weasel.Postgresql; + +namespace Marten.Events.Fetching; + +internal partial class FetchLivePlan +{ + public async Task> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate, + CancellationToken cancellation = default) + { + var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) + .ConfigureAwait(false); + + if (forUpdate) + { + await session.BeginTransactionAsync(cancellation).ConfigureAwait(false); + } + + var builder = new BatchBuilder{TenantId = session.TenantId}; + _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, forUpdate); + + builder.StartNewCommand(); + + var handler = _identityStrategy.BuildEventQueryHandler(id, selector); + handler.ConfigureCommand(builder, session); + + try + { + await using var reader = + await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); + + return await ReadIntoStream(session, id, cancellation, reader, handler).ConfigureAwait(false); + } + catch (Exception e) + { + if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) + { + throw new StreamLockedException(id, e.InnerException); + } + + if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) + { + throw new StreamLockedException(id, e.InnerException); + } + + throw; + } + } + + private async Task> ReadIntoStream(DocumentSessionBase session, TId id, CancellationToken cancellation, + DbDataReader reader, IQueryHandler> handler) + { + long version = 0; + try + { + if (await reader.ReadAsync(cancellation).ConfigureAwait(false)) + { + version = await reader.GetFieldValueAsync(0, cancellation).ConfigureAwait(false); + } + + await reader.NextResultAsync(cancellation).ConfigureAwait(false); + var events = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); + var document = await _aggregator.BuildAsync(events, session, default, id, _documentStorage, cancellation).ConfigureAwait(false); + if (document != null) + { + _documentStorage.SetIdentity(document, id); + } + + var stream = version == 0 + ? _identityStrategy.StartStream(document, session, id, cancellation) + : _identityStrategy.AppendToStream(document, session, id, version, cancellation); + + // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest + if (session.Options.Events.UseIdentityMapForAggregates) + { + session.StoreDocumentInItemMap(id, stream); + } + + return stream; + } + catch (Exception e) + { + if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) + { + throw new StreamLockedException(id, e.InnerException); + } + + if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) + { + throw new StreamLockedException(id, e.InnerException); + } + + throw; + } + } + + public IQueryHandler> BuildQueryHandler(QuerySession session, TId id, bool forUpdate) + { + session.AssertIsDocumentSession(); + return new ForUpdateQueryHandler(this, id, forUpdate); + } + + public class ForUpdateQueryHandler : IQueryHandler> + { + private readonly FetchLivePlan _parent; + private readonly TId _id; + private readonly bool _forUpdate; + private readonly IQueryHandler> _handler; + + public ForUpdateQueryHandler(FetchLivePlan parent, TId id, bool forUpdate) + { + _parent = parent; + _id = id; + _forUpdate = forUpdate; + + _handler = _parent._identityStrategy.BuildEventQueryHandler(_id); + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + _parent._identityStrategy.BuildCommandForReadingVersionForStream(builder, _id, _forUpdate); + + builder.StartNewCommand(); + + _handler.ConfigureCommand(builder, session); + } + + public Task> HandleAsync(DbDataReader reader, IMartenSession session, CancellationToken token) + { + return _parent.ReadIntoStream((DocumentSessionBase)session, _id, token, reader, _handler); + } + + public IEventStream Handle(DbDataReader reader, IMartenSession session) + { + throw new NotSupportedException(); + } + + public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token) + { + throw new NotSupportedException(); + } + } + +} diff --git a/src/Marten/Events/Fetching/FetchLivePlan.cs b/src/Marten/Events/Fetching/FetchLivePlan.cs index 04973d5306..cb0b52e3fa 100644 --- a/src/Marten/Events/Fetching/FetchLivePlan.cs +++ b/src/Marten/Events/Fetching/FetchLivePlan.cs @@ -5,16 +5,16 @@ using JasperFx.Core.Reflection; using JasperFx.Events.Aggregation; using JasperFx.Events.Projections; -using Marten.Events.Projections; using Marten.Exceptions; using Marten.Internal.Sessions; using Marten.Internal.Storage; +using Marten.Linq.QueryHandlers; using Npgsql; using Weasel.Postgresql; namespace Marten.Events.Fetching; -internal class FetchLivePlan: IAggregateFetchPlan where TDoc : class where TId : notnull +internal partial class FetchLivePlan: IAggregateFetchPlan where TDoc : class where TId : notnull { private readonly IAggregator _aggregator; private readonly IDocumentStorage _documentStorage; @@ -32,162 +32,5 @@ public FetchLivePlan(EventGraph events, IEventIdentityStrategy identityStra ?? typeof(IdentityForwardingAggregator<,,,>).CloseAndBuildAs>(raw, _documentStorage, typeof(TDoc), _documentStorage.IdType, typeof(TId), typeof(IQuerySession)); } - public async Task> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate, - CancellationToken cancellation = default) - { - var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) - .ConfigureAwait(false); - - if (forUpdate) - { - await session.BeginTransactionAsync(cancellation).ConfigureAwait(false); - } - - var builder = new BatchBuilder{TenantId = session.TenantId}; - _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, forUpdate); - - builder.StartNewCommand(); - - var handler = _identityStrategy.BuildEventQueryHandler(id, selector); - handler.ConfigureCommand(builder, session); - - long version = 0; - try - { - await using var reader = - await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); - if (await reader.ReadAsync(cancellation).ConfigureAwait(false)) - { - version = await reader.GetFieldValueAsync(0, cancellation).ConfigureAwait(false); - } - - await reader.NextResultAsync(cancellation).ConfigureAwait(false); - var events = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); - var document = await _aggregator.BuildAsync(events, session, default, id, _documentStorage, cancellation).ConfigureAwait(false); - if (document != null) - { - _documentStorage.SetIdentity(document, id); - } - - var stream = version == 0 - ? _identityStrategy.StartStream(document, session, id, cancellation) - : _identityStrategy.AppendToStream(document, session, id, version, cancellation); - - // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest - if (session.Options.Events.UseIdentityMapForAggregates) - { - session.StoreDocumentInItemMap(id, stream); - } - - return stream; - } - catch (Exception e) - { - if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) - { - throw new StreamLockedException(id, e.InnerException); - } - - if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) - { - throw new StreamLockedException(id, e.InnerException); - } - - throw; - } - } - - public async Task> FetchForWriting(DocumentSessionBase session, TId id, - long expectedStartingVersion, - CancellationToken cancellation = default) - { - - var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) - .ConfigureAwait(false); - - var builder = new BatchBuilder{TenantId = session.TenantId}; - _identityStrategy.BuildCommandForReadingVersionForStream(builder, id, false); - - builder.StartNewCommand(); - - var handler = _identityStrategy.BuildEventQueryHandler(id, selector); - handler.ConfigureCommand(builder, session); - - long version = 0; - try - { - await using var reader = - await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); - if (await reader.ReadAsync(cancellation).ConfigureAwait(false)) - { - version = await reader.GetFieldValueAsync(0, cancellation).ConfigureAwait(false); - } - - if (expectedStartingVersion != version) - { - throw new ConcurrencyException( - $"Expected the existing version to be {expectedStartingVersion}, but was {version}", - typeof(TDoc), id); - } - - await reader.NextResultAsync(cancellation).ConfigureAwait(false); - var events = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); - var document = await _aggregator.BuildAsync(events, session, default, id, _documentStorage, cancellation).ConfigureAwait(false); - - var stream = version == 0 - ? _identityStrategy.StartStream(document, session, id, cancellation) - : _identityStrategy.AppendToStream(document, session, id, version, cancellation); - - // This is an optimization for calling FetchForWriting, then immediately calling FetchLatest - if (session.Options.Events.UseIdentityMapForAggregates) - { - session.StoreDocumentInItemMap(id, stream); - } - - return stream; - } - catch (Exception e) - { - if (e.InnerException is NpgsqlException { SqlState: PostgresErrorCodes.InFailedSqlTransaction }) - { - throw new StreamLockedException(id, e.InnerException); - } - - if (e.Message.Contains(MartenCommandException.MaybeLockedRowsMessage)) - { - throw new StreamLockedException(id, e.InnerException); - } - - throw; - } - } - - public async ValueTask FetchForReading(DocumentSessionBase session, TId id, CancellationToken cancellation) - { - // Optimization for having called FetchForWriting, then FetchLatest on same session in short order - if (session.Options.Events.UseIdentityMapForAggregates) - { - if (session.TryGetAggregateFromIdentityMap, TId>(id, out var stream)) - { - var starting = stream.Aggregate; - var appendedEvents = stream.Events; - - return await _aggregator.BuildAsync(appendedEvents, session, starting, id, _documentStorage, cancellation).ConfigureAwait(false); - } - } - - var selector = await _identityStrategy.EnsureEventStorageExists(session, cancellation) - .ConfigureAwait(false); - - var builder = new BatchBuilder{TenantId = session.TenantId}; - - var handler = _identityStrategy.BuildEventQueryHandler(id, selector); - handler.ConfigureCommand(builder, session); - - await using var reader = - await session.ExecuteReaderAsync(builder.Compile(), cancellation).ConfigureAwait(false); - - var events = await handler.HandleAsync(reader, session, cancellation).ConfigureAwait(false); - return await _aggregator.BuildAsync(events, session, default, id, _documentStorage, cancellation).ConfigureAwait(false); - } + public ProjectionLifecycle Lifecycle => ProjectionLifecycle.Live; } diff --git a/src/Marten/Events/Fetching/PreCannedQueryHandler.cs b/src/Marten/Events/Fetching/PreCannedQueryHandler.cs new file mode 100644 index 0000000000..8f65bdba04 --- /dev/null +++ b/src/Marten/Events/Fetching/PreCannedQueryHandler.cs @@ -0,0 +1,40 @@ +using System; +using System.Data.Common; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Marten.Internal; +using Marten.Linq.QueryHandlers; +using Weasel.Postgresql; + +namespace Marten.Events.Fetching; + +internal class PreCannedQueryHandler: IQueryHandler +{ + private readonly T _value; + + public PreCannedQueryHandler(T value) + { + _value = value; + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + builder.Append("select 1"); + } + + public T Handle(DbDataReader reader, IMartenSession session) + { + return _value; + } + + public Task HandleAsync(DbDataReader reader, IMartenSession session, CancellationToken token) + { + return Task.FromResult(_value); + } + + public Task StreamJson(Stream stream, DbDataReader reader, CancellationToken token) + { + throw new NotImplementedException(); + } +} diff --git a/src/Marten/Events/StubEventStream.cs b/src/Marten/Events/StubEventStream.cs new file mode 100644 index 0000000000..bee7548347 --- /dev/null +++ b/src/Marten/Events/StubEventStream.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using JasperFx.Events; + +namespace Marten.Events; + +/// +/// A testing standin fake for IEventStream that might be helpful in +/// unit testing +/// +/// +public class StubEventStream : IEventStream where T : notnull +{ + /// + /// Start from an existing aggregate -- or null + /// + /// + public StubEventStream(T? aggregate) + { + Aggregate = aggregate; + + EventGraph = new EventGraph(new StoreOptions()); + } + + /// + /// Start from an existing aggregate and a configuration for Marten. + /// You only care about this overload if you are customizing event + /// type aliases + /// + /// + /// + public StubEventStream(T? aggregate, StoreOptions options) + { + Aggregate = aggregate; + + EventGraph = new EventGraph(options); + } + + internal EventGraph EventGraph { get; } + + public void AppendOne(object @event) + { + EventsAppended.Add(@event); + } + + public void AppendMany(params object[] events) + { + EventsAppended.AddRange(events); + } + + public void AppendMany(IEnumerable events) + { + EventsAppended.AddRange(events); + } + + /// + /// A record of any events appended to this stream + /// + public List EventsAppended { get; } = new(); + + public T? Aggregate { get; } + public long? StartingVersion { get; set; } + public long? CurrentVersion { get; set; } + public CancellationToken Cancellation { get; } = default; + public Guid Id { get; set; } = Guid.NewGuid(); + public string Key { get; set; } = Guid.NewGuid().ToString(); + + public IReadOnlyList Events => EventsAppended.Select(x => EventGraph.BuildEvent(x)).ToList(); +} diff --git a/src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs b/src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs new file mode 100644 index 0000000000..47030d0c71 --- /dev/null +++ b/src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs @@ -0,0 +1,182 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using JasperFx.Core.Reflection; +using JasperFx.Events; +using JasperFx.Events.Projections; +using Marten.Events; +using Marten.Events.Fetching; +using Marten.Events.Querying; +using Marten.Linq.QueryHandlers; + +namespace Marten.Services.BatchQuerying; + +internal partial class BatchedQuery: IBatchEvents +{ + public Task Load(Guid id) + { + _documentTypes.Add(typeof(IEvent)); + var handler = new SingleEventQueryHandler(id, Parent.EventStorage()); + return AddItem(handler); + } + + public Task FetchStreamState(Guid streamId) + { + _documentTypes.Add(typeof(IEvent)); + var handler = Parent.EventStorage() + .QueryForStream(StreamAction.ForReference(streamId, Parent.TenantId)); + + return AddItem(handler); + } + + public Task FetchStreamState(string streamKey) + { + _documentTypes.Add(typeof(IEvent)); + var handler = Parent.EventStorage() + .QueryForStream(StreamAction.ForReference(streamKey, Parent.TenantId)); + + return AddItem(handler); + } + + public Task> FetchStream(Guid streamId, long version = 0, DateTimeOffset? timestamp = null, + long fromVersion = 0) + { + _documentTypes.Add(typeof(IEvent)); + var selector = Parent.EventStorage(); + var statement = new EventStatement(selector) + { + StreamId = streamId, + Version = version, + Timestamp = timestamp, + TenantId = Parent.TenantId, + FromVersion = fromVersion + }; + + IQueryHandler> handler = new ListQueryHandler(statement, selector); + + return AddItem(handler); + } + + public Task> FetchStream(string streamKey, long version = 0, DateTimeOffset? timestamp = null, + long fromVersion = 0) + { + _documentTypes.Add(typeof(IEvent)); + var selector = Parent.EventStorage(); + var statement = new EventStatement(selector) + { + StreamKey = streamKey, + Version = version, + Timestamp = timestamp, + TenantId = Parent.TenantId, + FromVersion = fromVersion + }; + + IQueryHandler> handler = new ListQueryHandler(statement, selector); + + return AddItem(handler); + } + + public Task> FetchForWriting(Guid id) where T : class + { + _documentTypes.Add(typeof(IEvent)); + var plan = Parent.Events.As().FindFetchPlan(); + if (plan.Lifecycle != ProjectionLifecycle.Live) + { + _documentTypes.Add(typeof(T)); + } + var handler = plan.BuildQueryHandler(Parent, id, false); + return AddItem(handler); + } + + public Task> FetchForWriting(string key) where T : class + { + _documentTypes.Add(typeof(IEvent)); + + var plan = Parent.Events.As().FindFetchPlan(); + if (plan.Lifecycle != ProjectionLifecycle.Live) + { + _documentTypes.Add(typeof(T)); + } + + var handler = plan.BuildQueryHandler(Parent, key, false); + return AddItem(handler); + } + + public Task> FetchForWriting(Guid id, long expectedVersion) where T : class + { + _documentTypes.Add(typeof(IEvent)); + var plan = Parent.Events.As().FindFetchPlan(); + if (plan.Lifecycle != ProjectionLifecycle.Live) + { + _documentTypes.Add(typeof(T)); + } + var handler = plan.BuildQueryHandler(Parent, id, expectedVersion); + return AddItem(handler); + } + + public Task> FetchForWriting(string key, long expectedVersion) where T : class + { + _documentTypes.Add(typeof(IEvent)); + var plan = Parent.Events.As().FindFetchPlan(); + if (plan.Lifecycle != ProjectionLifecycle.Live) + { + _documentTypes.Add(typeof(T)); + } + var handler = plan.BuildQueryHandler(Parent, key, expectedVersion); + return AddItem(handler); + } + + public async Task> FetchForExclusiveWriting(Guid id) where T : class + { + await Parent.BeginTransactionAsync(CancellationToken.None).ConfigureAwait(false); + + _documentTypes.Add(typeof(IEvent)); + var plan = Parent.Events.As().FindFetchPlan(); + if (plan.Lifecycle != ProjectionLifecycle.Live) + { + _documentTypes.Add(typeof(T)); + } + var handler = plan.BuildQueryHandler(Parent, id, true); + + return await AddItem(handler).ConfigureAwait(false); + } + + public async Task> FetchForExclusiveWriting(string key) where T : class + { + await Parent.BeginTransactionAsync(CancellationToken.None).ConfigureAwait(false); + + _documentTypes.Add(typeof(IEvent)); + var plan = Parent.Events.As().FindFetchPlan(); + if (plan.Lifecycle != ProjectionLifecycle.Live) + { + _documentTypes.Add(typeof(T)); + } + var handler = plan.BuildQueryHandler(Parent, key, true); + return await AddItem(handler).ConfigureAwait(false); + } + + public Task FetchLatest(Guid id) where T : class + { + _documentTypes.Add(typeof(IEvent)); + var plan = Parent.Events.As().FindFetchPlan(); + if (plan.Lifecycle != ProjectionLifecycle.Live) + { + _documentTypes.Add(typeof(T)); + } + var handler = plan.BuildQueryHandler(Parent, id); + return AddItem(handler); + } + + public Task FetchLatest(string id) where T : class + { + _documentTypes.Add(typeof(IEvent)); + var plan = Parent.Events.As().FindFetchPlan(); + if (plan.Lifecycle != ProjectionLifecycle.Live) + { + _documentTypes.Add(typeof(T)); + } + var handler = plan.BuildQueryHandler(Parent, id); + return AddItem(handler); + } +} diff --git a/src/Marten/Services/BatchQuerying/BatchedQuery.cs b/src/Marten/Services/BatchQuerying/BatchedQuery.cs index 5f233b484d..6d6ef970ba 100644 --- a/src/Marten/Services/BatchQuerying/BatchedQuery.cs +++ b/src/Marten/Services/BatchQuerying/BatchedQuery.cs @@ -18,7 +18,7 @@ namespace Marten.Services.BatchQuerying; -internal class BatchedQuery: IBatchedQuery, IBatchEvents +internal partial class BatchedQuery: IBatchedQuery { private readonly List _documentTypes = new(); private readonly IList _items = new List(); @@ -120,69 +120,6 @@ public Task Query(ICompiledQuery query) w return AddItem(handler); } - - public Task Load(Guid id) - { - _documentTypes.Add(typeof(IEvent)); - var handler = new SingleEventQueryHandler(id, Parent.EventStorage()); - return AddItem(handler); - } - - public Task FetchStreamState(Guid streamId) - { - _documentTypes.Add(typeof(IEvent)); - var handler = Parent.EventStorage() - .QueryForStream(StreamAction.ForReference(streamId, Parent.TenantId)); - - return AddItem(handler); - } - - public Task FetchStreamState(string streamKey) - { - _documentTypes.Add(typeof(IEvent)); - var handler = Parent.EventStorage() - .QueryForStream(StreamAction.ForReference(streamKey, Parent.TenantId)); - - return AddItem(handler); - } - - public Task> FetchStream(Guid streamId, long version = 0, DateTimeOffset? timestamp = null, - long fromVersion = 0) - { - _documentTypes.Add(typeof(IEvent)); - var selector = Parent.EventStorage(); - var statement = new EventStatement(selector) - { - StreamId = streamId, - Version = version, - Timestamp = timestamp, - TenantId = Parent.TenantId, - FromVersion = fromVersion - }; - - IQueryHandler> handler = new ListQueryHandler(statement, selector); - - return AddItem(handler); - } - - public Task> FetchStream(string streamKey, long version = 0, DateTimeOffset? timestamp = null, long fromVersion = 0) - { - _documentTypes.Add(typeof(IEvent)); - var selector = Parent.EventStorage(); - var statement = new EventStatement(selector) - { - StreamKey = streamKey, - Version = version, - Timestamp = timestamp, - TenantId = Parent.TenantId, - FromVersion = fromVersion - }; - - IQueryHandler> handler = new ListQueryHandler(statement, selector); - - return AddItem(handler); - } - public Task AddItem(IQueryHandler handler) { var item = new BatchQueryItem(handler); diff --git a/src/Marten/Services/BatchQuerying/IBatchedQuery.cs b/src/Marten/Services/BatchQuerying/IBatchedQuery.cs index 700e758b64..db6e1b28c5 100644 --- a/src/Marten/Services/BatchQuerying/IBatchedQuery.cs +++ b/src/Marten/Services/BatchQuerying/IBatchedQuery.cs @@ -55,6 +55,85 @@ Task> FetchStream(Guid streamId, long version = 0, DateTim /// Task> FetchStream(string streamKey, long version = 0, DateTimeOffset? timestamp = null, long fromVersion = 0); + + /// + /// Fetch the projected aggregate T by id with built in optimistic concurrency checks + /// starting at the point the aggregate was fetched. + /// + /// + /// + /// + Task> FetchForWriting(Guid id) where T : class; + + /// + /// Fetch the projected aggregate T by id with built in optimistic concurrency checks + /// starting at the point the aggregate was fetched. + /// + /// + /// + /// + /// + Task> FetchForWriting(string key) where T : class; + + /// + /// Fetch projected aggregate T by id and expected, current version of the aggregate. Will fail immediately + /// with ConcurrencyInjection if the expectedVersion is stale. Builds in optimistic concurrency for later + /// + /// + /// + /// + /// + Task> FetchForWriting(Guid id, long expectedVersion) + where T : class; + + /// + /// Fetch projected aggregate T by id and expected, current version of the aggregate. Will fail immediately + /// with ConcurrencyInjection if the expectedVersion is stale. Builds in optimistic concurrency for later + /// + /// + /// + /// + /// + /// + Task> FetchForWriting(string key, long expectedVersion) + where T : class; + + /// + /// Fetch projected aggregate T by id for exclusive writing + /// + /// + /// + /// + Task> FetchForExclusiveWriting(Guid id) + where T : class; + + /// + /// Fetch projected aggregate T by id for exclusive writing + /// + /// + /// + /// + /// + Task> FetchForExclusiveWriting(string key) + where T : class; + + /// + /// Fetch the projected aggregate T by id. This API functions regardless of the projection lifecycle, + /// and should be thought of as a lightweight, read-only version of FetchForWriting + /// + /// + /// + /// + Task FetchLatest(Guid id) where T : class; + + /// + /// Fetch the projected aggregate T by id. This API functions regardless of the projection lifecycle, + /// and should be thought of as a lightweight, read-only version of FetchForWriting + /// + /// + /// + /// + Task FetchLatest(string id) where T : class; } public interface IBatchedQuery @@ -164,4 +243,5 @@ public interface IBatchedQuery /// /// Task QueryByPlan(IBatchQueryPlan plan); + }