diff --git a/src/DaemonTests/Aggregations/build_aggregate_projection.cs b/src/DaemonTests/Aggregations/build_aggregate_projection.cs index 71a626e05f..5b83a29720 100644 --- a/src/DaemonTests/Aggregations/build_aggregate_projection.cs +++ b/src/DaemonTests/Aggregations/build_aggregate_projection.cs @@ -42,13 +42,13 @@ public async Task simple_scenario() await store.Advanced.Clean.DeleteAllEventDataAsync(); using var session = store.LightweightSession(); - session.ForTenant("blue").Events.StartStream("one", new AEvent(), new BEvent()); - session.ForTenant("blue").Events.StartStream("two", new BEvent(), new BEvent()); - session.ForTenant("blue").Events.StartStream("three", new AEvent(), new AEvent()); + session.ForTenant("blue").Events.StartStream("one", new MTAEvent(), new MTBEvent()); + session.ForTenant("blue").Events.StartStream("two", new MTBEvent(), new MTBEvent()); + session.ForTenant("blue").Events.StartStream("three", new MTAEvent(), new MTAEvent()); - session.ForTenant("red").Events.StartStream("one", new AEvent(), new BEvent(), new AEvent()); - session.ForTenant("red").Events.StartStream("two", new BEvent(), new BEvent(), new BEvent()); - session.ForTenant("red").Events.StartStream("five", new BEvent(), new BEvent(), new BEvent()); + session.ForTenant("red").Events.StartStream("one", new MTAEvent(), new MTBEvent(), new MTAEvent()); + session.ForTenant("red").Events.StartStream("two", new MTBEvent(), new MTBEvent(), new MTBEvent()); + session.ForTenant("red").Events.StartStream("five", new MTBEvent(), new MTBEvent(), new MTBEvent()); await session.SaveChangesAsync(); @@ -552,12 +552,12 @@ public class SimpleEntity: ITenanted public string TenantId { get; set; } - public void Apply(AEvent _) + public void Apply(MTAEvent _) { A++; } - public void Apply(BEvent _) + public void Apply(MTBEvent _) { B++; } diff --git a/src/DaemonTests/Aggregations/converting_projection_from_inline_to_async.cs b/src/DaemonTests/Aggregations/converting_projection_from_inline_to_async.cs index 9fd961a3cb..9bf778cf29 100644 --- a/src/DaemonTests/Aggregations/converting_projection_from_inline_to_async.cs +++ b/src/DaemonTests/Aggregations/converting_projection_from_inline_to_async.cs @@ -20,9 +20,9 @@ public async Task start_as_inline_move_to_async_and_just_continue() opts.Projections.Snapshot(SnapshotLifecycle.Inline); }); - var id1 = theSession.Events.StartStream(new AEvent(), new BEvent()).Id; - var id2 = theSession.Events.StartStream(new BEvent(), new CEvent()).Id; - var id3 = theSession.Events.StartStream(new CEvent(), new DEvent()).Id; + var id1 = theSession.Events.StartStream(new MTAEvent(), new MTBEvent()).Id; + var id2 = theSession.Events.StartStream(new MTBEvent(), new MTCEvent()).Id; + var id3 = theSession.Events.StartStream(new MTCEvent(), new MTDEvent()).Id; await theSession.SaveChangesAsync(); var store2 = SeparateStore(opts => @@ -46,9 +46,9 @@ public async Task start_as_inline_move_to_async_and_just_continue() await daemon.StartAllAsync(); using var session = store2.LightweightSession(); - session.Events.Append(id1, new EEvent(), new EEvent()); - session.Events.Append(id2, new EEvent(), new EEvent()); - session.Events.Append(id3, new EEvent(), new EEvent()); + session.Events.Append(id1, new MTEEvent(), new MTEEvent()); + session.Events.Append(id2, new MTEEvent(), new MTEEvent()); + session.Events.Append(id3, new MTEEvent(), new MTEEvent()); await session.SaveChangesAsync(); await daemon.WaitForNonStaleData(10.Seconds()); @@ -91,27 +91,27 @@ public class SimpleAggregate : IRevisioned public int DCount { get; set; } public int ECount { get; set; } - public void Apply(AEvent _) + public void Apply(MTAEvent _) { ACount++; } - public void Apply(BEvent _) + public void Apply(MTBEvent _) { BCount++; } - public void Apply(CEvent _) + public void Apply(MTCEvent _) { CCount++; } - public void Apply(DEvent _) + public void Apply(MTDEvent _) { DCount++; } - public void Apply(EEvent _) + public void Apply(MTEEvent _) { ECount++; } diff --git a/src/DaemonTests/Aggregations/side_effects_in_aggregations.cs b/src/DaemonTests/Aggregations/side_effects_in_aggregations.cs index d08f4e295a..045cab109a 100644 --- a/src/DaemonTests/Aggregations/side_effects_in_aggregations.cs +++ b/src/DaemonTests/Aggregations/side_effects_in_aggregations.cs @@ -44,8 +44,8 @@ public async Task add_events_single_stream_guid_identifier() await daemon.StartAllAsync(); var streamId = Guid.NewGuid(); - theSession.Events.StartStream(streamId, new AEvent(), - new AEvent(), new AEvent()); + theSession.Events.StartStream(streamId, new MTAEvent(), + new MTAEvent(), new MTAEvent()); await theSession.SaveChangesAsync(); await daemon.WaitForNonStaleData(30.Seconds()); @@ -54,7 +54,7 @@ public async Task add_events_single_stream_guid_identifier() version1.A.ShouldBe(3); version1.B.ShouldBe(0); - theSession.Events.Append(streamId, new AEvent(), new AEvent()); + theSession.Events.Append(streamId, new MTAEvent(), new MTAEvent()); await theSession.SaveChangesAsync(); @@ -102,13 +102,13 @@ public async Task calls_side_effects_when_there_is_a_delete_event() await daemon.StartAllAsync(); var streamId = Guid.NewGuid(); - theSession.Events.StartStream(streamId, new AEvent(), - new AEvent(), new AEvent()); + theSession.Events.StartStream(streamId, new MTAEvent(), + new MTAEvent(), new MTAEvent()); await theSession.SaveChangesAsync(); await daemon.WaitForNonStaleData(30.Seconds()); - theSession.Events.Append(streamId, new EEvent()); + theSession.Events.Append(streamId, new MTEEvent()); await theSession.SaveChangesAsync(); await daemon.WaitForNonStaleData(30.Seconds()); @@ -141,8 +141,8 @@ public async Task add_events_single_stream_guid_identifier_when_starting_a_strea await daemon.StartAllAsync(); var streamId = Guid.NewGuid(); - theSession.Events.StartStream(streamId, new AEvent(), - new AEvent(), new AEvent(), new AEvent(), new AEvent()); + theSession.Events.StartStream(streamId, new MTAEvent(), + new MTAEvent(), new MTAEvent(), new MTAEvent(), new MTAEvent()); await theSession.SaveChangesAsync(); // Prove the BEevent side effect happened as expected @@ -187,8 +187,8 @@ public async Task add_events_single_stream_string_identifier_when_starting_a_str await daemon.StartAllAsync(); var streamKey = Guid.NewGuid().ToString(); - theSession.Events.StartStream(streamKey, new AEvent(), - new AEvent(), new AEvent(), new AEvent(), new AEvent()); + theSession.Events.StartStream(streamKey, new MTAEvent(), + new MTAEvent(), new MTAEvent(), new MTAEvent(), new MTAEvent()); await theSession.SaveChangesAsync(); // Prove the BEevent side effect happened as expected @@ -233,8 +233,8 @@ public async Task add_events_single_stream_string_identifier() await daemon.StartAllAsync(); var streamKey = Guid.NewGuid().ToString(); - theSession.Events.StartStream(streamKey, new AEvent(), - new AEvent(), new AEvent()); + theSession.Events.StartStream(streamKey, new MTAEvent(), + new MTAEvent(), new MTAEvent()); await theSession.SaveChangesAsync(); await daemon.WaitForNonStaleData(30.Seconds()); @@ -243,7 +243,7 @@ public async Task add_events_single_stream_string_identifier() version1.A.ShouldBe(3); version1.B.ShouldBe(0); - theSession.Events.Append(streamKey, new AEvent(), new AEvent()); + theSession.Events.Append(streamKey, new MTAEvent(), new MTAEvent()); await theSession.SaveChangesAsync(); @@ -290,8 +290,8 @@ public async Task side_effects_do_not_happen_in_rebuilds() await daemon.StartAllAsync(); var streamKey = Guid.NewGuid().ToString(); - theSession.Events.StartStream(streamKey, new AEvent(), - new AEvent(), new AEvent(), new AEvent(), new AEvent()); + theSession.Events.StartStream(streamKey, new MTAEvent(), + new MTAEvent(), new MTAEvent(), new MTAEvent(), new MTAEvent()); await theSession.SaveChangesAsync(); // Prove the BEevent side effect happened as expected @@ -341,9 +341,9 @@ public async Task publishing_messages_in_continuous_mode() var stream2 = Guid.NewGuid(); var stream3 = Guid.NewGuid(); - theSession.Events.StartStream(stream1, new AEvent(), new AEvent()); - theSession.Events.StartStream(stream2, new AEvent(), new AEvent()); - theSession.Events.StartStream(stream3, new AEvent(), new AEvent(), new BEvent()); + theSession.Events.StartStream(stream1, new MTAEvent(), new MTAEvent()); + theSession.Events.StartStream(stream2, new MTAEvent(), new MTAEvent()); + theSession.Events.StartStream(stream3, new MTAEvent(), new MTAEvent(), new MTBEvent()); await theSession.SaveChangesAsync(); await daemon.WaitForNonStaleData(120.Seconds()); @@ -363,15 +363,15 @@ public class Projection1: SingleStreamProjection { public Projection1() { - DeleteEvent(); + DeleteEvent(); } - public void Apply(SideEffects1 aggregate, AEvent _) + public void Apply(SideEffects1 aggregate, MTAEvent _) { aggregate.A++; } - public void Apply(SideEffects1 aggregate, BEvent _) + public void Apply(SideEffects1 aggregate, MTBEvent _) { aggregate.B++; } @@ -380,10 +380,10 @@ public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEven { if (slice.Snapshot?.A % 5 == 0) { - slice.AppendEvent(new BEvent()); + slice.AppendEvent(new MTBEvent()); } - if (slice.Events().OfType>().Any()) + if (slice.Events().OfType>().Any()) { slice.PublishMessage(new WasDeleted(slice.Events().First().StreamId)); } @@ -407,12 +407,12 @@ public record WasDeleted(Guid Id); public class Projection2: SingleStreamProjection { - public void Apply(SideEffects2 aggregate, AEvent _) + public void Apply(SideEffects2 aggregate, MTAEvent _) { aggregate.A++; } - public void Apply(SideEffects2 aggregate, BEvent _) + public void Apply(SideEffects2 aggregate, MTBEvent _) { aggregate.B++; } @@ -421,7 +421,7 @@ public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEven { if (slice.Snapshot.A >= 5 && slice.Snapshot.B == 0) { - slice.AppendEvent(new BEvent()); + slice.AppendEvent(new MTBEvent()); } return new ValueTask(); @@ -430,19 +430,19 @@ public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEven public class Projection3: SingleStreamProjection { - public void Apply(SideEffects1 aggregate, AEvent _) + public void Apply(SideEffects1 aggregate, MTAEvent _) { aggregate.A++; } - public void Apply(SideEffects1 aggregate, BEvent _) + public void Apply(SideEffects1 aggregate, MTBEvent _) { } public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice slice) { - if (slice.Snapshot != null && slice.Events().OfType>().Any()) + if (slice.Snapshot != null && slice.Events().OfType>().Any()) { slice.PublishMessage(new GotB(slice.Snapshot.Id)); } diff --git a/src/DaemonTests/Bugs/Bug_2597_rebuilding_a_projection_with_no_matching_events_but_other_non_matching_events.cs b/src/DaemonTests/Bugs/Bug_2597_rebuilding_a_projection_with_no_matching_events_but_other_non_matching_events.cs index 6863100b82..a2d717240c 100644 --- a/src/DaemonTests/Bugs/Bug_2597_rebuilding_a_projection_with_no_matching_events_but_other_non_matching_events.cs +++ b/src/DaemonTests/Bugs/Bug_2597_rebuilding_a_projection_with_no_matching_events_but_other_non_matching_events.cs @@ -21,11 +21,11 @@ public async Task do_not_blow_up() using (var session = theStore.LightweightSession()) { - session.Events.Append(Guid.NewGuid(), new BEvent(), new CEvent(), new DEvent()); - session.Events.Append(Guid.NewGuid(), new BEvent(), new CEvent(), new DEvent()); - session.Events.Append(Guid.NewGuid(), new BEvent(), new CEvent(), new DEvent()); - session.Events.Append(Guid.NewGuid(), new BEvent(), new CEvent(), new DEvent()); - session.Events.Append(Guid.NewGuid(), new BEvent(), new CEvent(), new DEvent()); + session.Events.Append(Guid.NewGuid(), new MTBEvent(), new MTCEvent(), new MTDEvent()); + session.Events.Append(Guid.NewGuid(), new MTBEvent(), new MTCEvent(), new MTDEvent()); + session.Events.Append(Guid.NewGuid(), new MTBEvent(), new MTCEvent(), new MTDEvent()); + session.Events.Append(Guid.NewGuid(), new MTBEvent(), new MTCEvent(), new MTDEvent()); + session.Events.Append(Guid.NewGuid(), new MTBEvent(), new MTCEvent(), new MTDEvent()); await session.SaveChangesAsync(); } @@ -55,7 +55,7 @@ public class UsesAEventOnly public Guid Id { get; set; } public int Count { get; set; } - public void Apply(AEvent e) => Count++; + public void Apply(MTAEvent e) => Count++; } public class OtherAggregate @@ -63,7 +63,7 @@ public class OtherAggregate public Guid Id { get; set; } public int Count { get; set; } - public void Apply(AEvent e) => Count++; - public void Apply(BEvent e) => Count++; - public void Apply(CEvent e) => Count++; + public void Apply(MTAEvent e) => Count++; + public void Apply(MTBEvent e) => Count++; + public void Apply(MTCEvent e) => Count++; } diff --git a/src/DaemonTests/Bugs/Bug_3221_assert_on_wrong_identity_type_from_multi_stream_projection_to_slicer.cs b/src/DaemonTests/Bugs/Bug_3221_assert_on_wrong_identity_type_from_multi_stream_projection_to_slicer.cs index 1efd05a737..056e104456 100644 --- a/src/DaemonTests/Bugs/Bug_3221_assert_on_wrong_identity_type_from_multi_stream_projection_to_slicer.cs +++ b/src/DaemonTests/Bugs/Bug_3221_assert_on_wrong_identity_type_from_multi_stream_projection_to_slicer.cs @@ -39,10 +39,10 @@ public class MismatchedIdentityProjection : MultiStreamProjection>(c => c.TenantId); + Identity>(c => c.TenantId); } - public void Apply(Target state, IEvent e) => state.Number++; + public void Apply(Target state, IEvent e) => state.Number++; } diff --git a/src/DaemonTests/EventProjections/using_patches_in_async_mode.cs b/src/DaemonTests/EventProjections/using_patches_in_async_mode.cs index f6eea47ebe..b9286de986 100644 --- a/src/DaemonTests/EventProjections/using_patches_in_async_mode.cs +++ b/src/DaemonTests/EventProjections/using_patches_in_async_mode.cs @@ -28,13 +28,13 @@ public async Task do_some_patching() var id2 = Guid.NewGuid(); var id3 = Guid.NewGuid(); - theSession.Events.StartStream(id1, new StartAggregate(), new AEvent(), new AEvent(), new BEvent()); - theSession.Events.StartStream(id2, new StartAggregate(), new AEvent(), new CEvent(), new CEvent()); - theSession.Events.StartStream(id3, new StartAggregate(), new BEvent(), new BEvent(), new BEvent(), new CEvent()); + theSession.Events.StartStream(id1, new StartAggregate(), new MTAEvent(), new MTAEvent(), new MTBEvent()); + theSession.Events.StartStream(id2, new StartAggregate(), new MTAEvent(), new MTCEvent(), new MTCEvent()); + theSession.Events.StartStream(id3, new StartAggregate(), new MTBEvent(), new MTBEvent(), new MTBEvent(), new MTCEvent()); for (int i = 0; i < 100; i++) { - theSession.Events.StartStream(new StartAggregate(), new AEvent(), new AEvent(), new BEvent()); + theSession.Events.StartStream(new StartAggregate(), new MTAEvent(), new MTAEvent(), new MTBEvent()); } await theSession.SaveChangesAsync(); @@ -60,17 +60,17 @@ public class LetterPatcher: EventProjection { public SimpleAggregate Transform(IEvent e) => new SimpleAggregate { Id = e.StreamId }; - public void Project(IEvent e, IDocumentOperations ops) + public void Project(IEvent e, IDocumentOperations ops) { ops.Patch(e.StreamId).Increment(x => x.ACount); } - public void Project(IEvent e, IDocumentOperations ops) + public void Project(IEvent e, IDocumentOperations ops) { ops.Patch(e.StreamId).Increment(x => x.BCount); } - public void Project(IEvent e, IDocumentOperations ops) + public void Project(IEvent e, IDocumentOperations ops) { ops.Patch(e.StreamId).Increment(x => x.CCount); } diff --git a/src/DaemonTests/Internals/fetching_events.cs b/src/DaemonTests/Internals/fetching_events.cs index ec433fd487..28b18963bc 100644 --- a/src/DaemonTests/Internals/fetching_events.cs +++ b/src/DaemonTests/Internals/fetching_events.cs @@ -68,7 +68,7 @@ public async Task simple_fetch_with_guid_identifiers() var stream = Guid.NewGuid(); await executeAfterLoadingEvents(e => { - e.Append(stream, new AEvent(), new BEvent(), new CEvent(), new DEvent()); + e.Append(stream, new MTAEvent(), new MTBEvent(), new MTCEvent(), new MTDEvent()); }); await theSession.SaveChangesAsync(); @@ -77,7 +77,7 @@ await executeAfterLoadingEvents(e => var @event = theRange.Events[0]; @event.StreamId.ShouldBe(stream); @event.Version.ShouldBe(1); - @event.Data.ShouldBeOfType(); + @event.Data.ShouldBeOfType(); } [Fact] @@ -88,7 +88,7 @@ public async Task simple_fetch_with_string_identifiers() var stream = Guid.NewGuid().ToString(); await executeAfterLoadingEvents(e => { - e.Append(stream, new AEvent(), new BEvent(), new CEvent(), new DEvent()); + e.Append(stream, new MTAEvent(), new MTBEvent(), new MTCEvent(), new MTDEvent()); }); await theSession.SaveChangesAsync(); @@ -97,7 +97,7 @@ await executeAfterLoadingEvents(e => var @event = theRange.Events[0]; @event.StreamKey.ShouldBe(stream); @event.Version.ShouldBe(1); - @event.Data.ShouldBeOfType(); + @event.Data.ShouldBeOfType(); } [Fact] @@ -105,9 +105,9 @@ public async Task should_get_the_aggregate_type_name_if_exists() { await executeAfterLoadingEvents(e => { - e.Append(Guid.NewGuid(), new AEvent(), new BEvent(), new CEvent(), new DEvent()); - e.StartStream(Guid.NewGuid(), new AEvent(), new BEvent(), new CEvent(), new DEvent(), - new DEvent()); + e.Append(Guid.NewGuid(), new MTAEvent(), new MTBEvent(), new MTCEvent(), new MTDEvent()); + e.StartStream(Guid.NewGuid(), new MTAEvent(), new MTBEvent(), new MTCEvent(), new MTDEvent(), + new MTDEvent()); }); for (var i = 0; i < 4; i++) @@ -126,9 +126,9 @@ public async Task should_get_the_generic_aggregate_type_name_if_exists() { await executeAfterLoadingEvents(e => { - e.Append(Guid.NewGuid(), new AEvent(), new BEvent(), new CEvent(), new DEvent()); - e.StartStream>(Guid.NewGuid(), new AEvent(), new BEvent(), new CEvent(), new DEvent(), - new DEvent()); + e.Append(Guid.NewGuid(), new MTAEvent(), new MTBEvent(), new MTCEvent(), new MTDEvent()); + e.StartStream>(Guid.NewGuid(), new MTAEvent(), new MTBEvent(), new MTCEvent(), new MTDEvent(), + new MTDEvent()); }); for (var i = 0; i < 4; i++) @@ -150,9 +150,9 @@ public async Task filter_on_aggregate_type_name_if_exists() await executeAfterLoadingEvents(e => { - e.Append(Guid.NewGuid(), new AEvent(), new BEvent(), new CEvent(), new DEvent()); - e.StartStream(Guid.NewGuid(), new AEvent(), new BEvent(), new CEvent(), new DEvent(), - new DEvent()); + e.Append(Guid.NewGuid(), new MTAEvent(), new MTBEvent(), new MTCEvent(), new MTDEvent()); + e.StartStream(Guid.NewGuid(), new MTAEvent(), new MTBEvent(), new MTCEvent(), new MTDEvent(), + new MTDEvent()); }); theRange.Events.Count.ShouldBe(5); @@ -166,9 +166,9 @@ public async Task filter_on_generic_aggregate_type_name_if_exists() await executeAfterLoadingEvents(e => { - e.Append(Guid.NewGuid(), new AEvent(), new BEvent(), new CEvent(), new DEvent()); - e.StartStream>(Guid.NewGuid(), new AEvent(), new BEvent(), new CEvent(), new DEvent(), - new DEvent()); + e.Append(Guid.NewGuid(), new MTAEvent(), new MTBEvent(), new MTCEvent(), new MTDEvent()); + e.StartStream>(Guid.NewGuid(), new MTAEvent(), new MTBEvent(), new MTCEvent(), new MTDEvent(), + new MTDEvent()); }); theRange.Events.Count.ShouldBe(5); diff --git a/src/DaemonTests/MultiTenancy/multi_tenancy_by_database.cs b/src/DaemonTests/MultiTenancy/multi_tenancy_by_database.cs index c3a6582ddb..23c269ba29 100644 --- a/src/DaemonTests/MultiTenancy/multi_tenancy_by_database.cs +++ b/src/DaemonTests/MultiTenancy/multi_tenancy_by_database.cs @@ -148,15 +148,15 @@ public async Task run_projections_end_to_end() var id = Guid.NewGuid(); await using var session1 = theStore.LightweightSession("tenant1"); - session1.Events.Append(id, new AEvent(), new BEvent(), new BEvent()); + session1.Events.Append(id, new MTAEvent(), new MTBEvent(), new MTBEvent()); await session1.SaveChangesAsync(); await using var session3 = theStore.LightweightSession("tenant3"); - session3.Events.Append(id, new AEvent(), new AEvent(), new BEvent(), new BEvent()); + session3.Events.Append(id, new MTAEvent(), new MTAEvent(), new MTBEvent(), new MTBEvent()); await session3.SaveChangesAsync(); await using var session4 = theStore.LightweightSession("tenant4"); - session4.Events.Append(id, new AEvent(), new BEvent(), new BEvent(), new BEvent()); + session4.Events.Append(id, new MTAEvent(), new MTBEvent(), new MTBEvent(), new MTBEvent()); await session4.SaveChangesAsync(); await (await theStore.Storage.FindOrCreateDatabase("tenant1")).Tracker.WaitForShardState("AllGood:All", 3); @@ -181,12 +181,12 @@ public MyAggregate Create(CreateEvent @event) return new MyAggregate { ACount = @event.A, BCount = @event.B, CCount = @event.C, DCount = @event.D }; } - public void Apply(AEvent @event, MyAggregate aggregate) + public void Apply(MTAEvent @event, MyAggregate aggregate) { aggregate.ACount++; } - public MyAggregate Apply(BEvent @event, MyAggregate aggregate) + public MyAggregate Apply(MTBEvent @event, MyAggregate aggregate) { return new MyAggregate { @@ -198,12 +198,12 @@ public MyAggregate Apply(BEvent @event, MyAggregate aggregate) }; } - public void Apply(MyAggregate aggregate, CEvent @event) + public void Apply(MyAggregate aggregate, MTCEvent @event) { aggregate.CCount++; } - public MyAggregate Apply(MyAggregate aggregate, DEvent @event) + public MyAggregate Apply(MyAggregate aggregate, MTDEvent @event) { return new MyAggregate { @@ -233,12 +233,12 @@ public MyAggregate Create(CreateEvent @event) return new MyAggregate { ACount = @event.A, BCount = @event.B, CCount = @event.C, DCount = @event.D }; } - public void Apply(AEvent @event, MyAggregate aggregate) + public void Apply(MTAEvent @event, MyAggregate aggregate) { aggregate.ACount++; } - public MyAggregate Apply(BEvent @event, MyAggregate aggregate) + public MyAggregate Apply(MTBEvent @event, MyAggregate aggregate) { return new MyAggregate { @@ -250,12 +250,12 @@ public MyAggregate Apply(BEvent @event, MyAggregate aggregate) }; } - public void Apply(MyAggregate aggregate, CEvent @event) + public void Apply(MyAggregate aggregate, MTCEvent @event) { aggregate.CCount++; } - public MyAggregate Apply(MyAggregate aggregate, DEvent @event) + public MyAggregate Apply(MyAggregate aggregate, MTDEvent @event) { return new MyAggregate { @@ -325,7 +325,7 @@ public interface ITabulator void Apply(MyAggregate aggregate); } -public class AEvent: ITabulator +public class MTAEvent: ITabulator { // Necessary for a couple tests. Let it go. public Guid Id { get; set; } @@ -338,7 +338,7 @@ public void Apply(MyAggregate aggregate) public Guid Tracker { get; } = Guid.NewGuid(); } -public class BEvent: ITabulator +public class MTBEvent: ITabulator { public void Apply(MyAggregate aggregate) { @@ -346,7 +346,7 @@ public void Apply(MyAggregate aggregate) } } -public class CEvent: ITabulator +public class MTCEvent: ITabulator { public void Apply(MyAggregate aggregate) { @@ -354,7 +354,7 @@ public void Apply(MyAggregate aggregate) } } -public class DEvent: ITabulator +public class MTDEvent: ITabulator { public void Apply(MyAggregate aggregate) { @@ -362,7 +362,7 @@ public void Apply(MyAggregate aggregate) } } -public class EEvent +public class MTEEvent { } diff --git a/src/DaemonTests/MultiTenancy/using_for_tenant_with_side_effects_and_subscriptions.cs b/src/DaemonTests/MultiTenancy/using_for_tenant_with_side_effects_and_subscriptions.cs index ba2dcfe720..ac6173352b 100644 --- a/src/DaemonTests/MultiTenancy/using_for_tenant_with_side_effects_and_subscriptions.cs +++ b/src/DaemonTests/MultiTenancy/using_for_tenant_with_side_effects_and_subscriptions.cs @@ -38,7 +38,7 @@ public async Task try_to_append_with_for_tenant_in_projection() }); using var session = theStore.LightweightSession("green"); - session.Events.StartStream([new AEvent(), new BEvent(), new CEvent()]); + session.Events.StartStream([new MTAEvent(), new MTBEvent(), new MTCEvent()]); await session.SaveChangesAsync(); using var daemon = await theStore.BuildProjectionDaemonAsync(); @@ -61,7 +61,7 @@ public async Task try_to_append_with_for_tenant_in_subscription() }); using var session = theStore.LightweightSession("green"); - session.Events.StartStream([new AEvent(), new BEvent(), new CEvent()]); + session.Events.StartStream([new MTAEvent(), new MTBEvent(), new MTCEvent()]); await session.SaveChangesAsync(); using var daemon = await theStore.BuildProjectionDaemonAsync(); diff --git a/src/DaemonTests/Resiliency/skipping_unknown_event_types_in_continuous_builds.cs b/src/DaemonTests/Resiliency/skipping_unknown_event_types_in_continuous_builds.cs index b6b7c63304..585d6640c2 100644 --- a/src/DaemonTests/Resiliency/skipping_unknown_event_types_in_continuous_builds.cs +++ b/src/DaemonTests/Resiliency/skipping_unknown_event_types_in_continuous_builds.cs @@ -35,7 +35,7 @@ public async Task InitializeAsync() opts.Connection(ConnectionSource.ConnectionString); opts.DisableNpgsqlLogging = true; opts.DatabaseSchemaName = "missing_events"; - opts.Events.MapEventType("TripleAAA"); + opts.Events.MapEventType("TripleAAA"); }).ApplyAllDatabaseChangesOnStartup(); }).StartAsync(); @@ -59,8 +59,8 @@ public async Task cleanly_skip_over_unknown_events() using (var session = store.LightweightSession()) { - session.Events.StartStream(streamId, new AEvent(), new AEvent(), new BEvent(), new BEvent(), - new CEvent(), new DEvent()); + session.Events.StartStream(streamId, new MTAEvent(), new MTAEvent(), new MTBEvent(), new MTBEvent(), + new MTCEvent(), new MTDEvent()); await session.SaveChangesAsync(); // Rig up a bad, unknown event type @@ -107,16 +107,16 @@ public override MyAggregate Evolve(MyAggregate snapshot, Guid id, IEvent e) snapshot ??= new MyAggregate(){ Id = id }; switch (e.Data) { - case AEvent: + case MTAEvent: snapshot.ACount++; break; - case BEvent: + case MTBEvent: snapshot.BCount++; break; - case CEvent: + case MTCEvent: snapshot.CCount++; break; - case DEvent: + case MTDEvent: snapshot.DCount++; break; } diff --git a/src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs b/src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs index 45998c9072..2db5192027 100644 --- a/src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs +++ b/src/DaemonTests/Subscriptions/subscriptions_end_to_end.cs @@ -666,8 +666,8 @@ public void subscription_wrapper_copies_the_filters_from_subscription_base() wrapper.IncludeArchivedEvents.ShouldBeTrue(); wrapper.IncludedEventTypes.Count.ShouldBe(2); - wrapper.IncludedEventTypes.ShouldContain(typeof(AEvent)); - wrapper.IncludedEventTypes.ShouldContain(typeof(BEvent)); + wrapper.IncludedEventTypes.ShouldContain(typeof(MTAEvent)); + wrapper.IncludedEventTypes.ShouldContain(typeof(MTBEvent)); } } @@ -675,8 +675,8 @@ public class FilteredSubscription: SubscriptionBase, IDisposable { public FilteredSubscription() { - IncludeType(); - IncludeType(); + IncludeType(); + IncludeType(); StreamType = typeof(SimpleAggregate); IncludeArchivedEvents = true; } @@ -707,8 +707,8 @@ public class FilteredSubscription2: SubscriptionBase, IAsyncDisposable { public FilteredSubscription2() { - IncludeType(); - IncludeType(); + IncludeType(); + IncludeType(); StreamType = typeof(SimpleAggregate); IncludeArchivedEvents = true; } diff --git a/src/DaemonTests/catching_up_mode_for_projections_and_subscriptions.cs b/src/DaemonTests/catching_up_mode_for_projections_and_subscriptions.cs new file mode 100644 index 0000000000..a81e01c071 --- /dev/null +++ b/src/DaemonTests/catching_up_mode_for_projections_and_subscriptions.cs @@ -0,0 +1,166 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventSourcingTests.Aggregation; +using JasperFx.Events; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events; +using Marten.Events.Aggregation; +using Marten.Events.Projections; +using Marten.Subscriptions; +using Marten.Testing.Harness; +using Shouldly; +using Xunit; + +namespace DaemonTests; + +public class catching_up_mode_for_projections_and_subscriptions : OneOffConfigurationsContext, IAsyncLifetime +{ + private EventStoreStatistics statistics; + private IReadOnlyList progress; + private Guid streamId; + + public async Task InitializeAsync() + { + StoreOptions(opts => + { + opts.Projections.Add(ProjectionLifecycle.Async); + opts.Events.Subscribe(new LetterEventsSubscription()); + opts.Projections.Add(ProjectionLifecycle.Async); + }); + + await theStore.Advanced.ResetAllData(); + LetterEventsSubscription.Clear(); + + streamId = theSession.Events.StartStream("ABCDEABBBBC".ToLetterEvents()).Id; + theSession.Events.StartStream("ABCBBC".ToLetterEvents()); + theSession.Events.StartStream("AEEEBBC".ToLetterEvents()); + theSession.Events.StartStream("AEEEEBBBC".ToLetterEvents()); + theSession.Events.StartStream("ACCCCCCBBC".ToLetterEvents()); + + await theSession.SaveChangesAsync(); + + using var daemon = await theStore.BuildProjectionDaemonAsync(); + await daemon.CatchUpAsync(CancellationToken.None); + + statistics = await theStore.Advanced.FetchEventStoreStatistics(); + progress = await theStore.Advanced.AllProjectionProgress(); + } + + public Task DisposeAsync() + { + return Task.CompletedTask; + } + + [Fact] + public async Task all_shards_advanced_to_the_high_water_mark() + { + foreach (var shardState in progress) + { + shardState.Sequence.ShouldBe(statistics.EventSequenceNumber); + } + } + + [Fact] + public async Task have_the_expected_documents_from_EventProjection() + { + // EventProjection ran + (await theSession.Query().CountAsync()).ShouldBe(6); + + var sequences = (await theSession.Events.QueryAllRawEvents().ToListAsync()) + .OfType>().Select(x => x.Sequence).ToArray(); + + var actuals = await theSession.Query().Select(x => x.Id).ToListAsync(); + + actuals.ShouldBe(sequences); + } + + [Fact] + public async Task aggregation_projection_can_catch_up() + { + // Aggregation too + (await theSession.Query().CountAsync()).ShouldBe(5); + var counts = await theSession.LoadAsync(streamId); + counts.ACount.ShouldBe(2); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(2); + counts.DCount.ShouldBe(1); + } + + [Fact] + public async Task subscription_catches_up() + { + // Subscription + LetterEventsSubscription.Events.Count.ShouldBe((int)statistics.EventCount); + LetterEventsSubscription.Events.Last().Sequence.ShouldBe(statistics.EventCount); + } + +} + +public class ADoc +{ + public long Id { get; set; } +} + +public class ADocEventProjection: EventProjection +{ + public void Project(IEvent e, IDocumentOperations ops) + { + ops.Store(new ADoc{Id = e.Sequence}); + } +} + +public class LetterEventsSubscription: SubscriptionBase +{ + public static List Events { get; } = new(); + + public static void Clear() + { + Events.Clear(); + } + + public override Task ProcessEventsAsync(EventRange page, ISubscriptionController controller, IDocumentOperations operations, + CancellationToken cancellationToken) + { + Events.AddRange(page.Events); + return Task.FromResult(NullChangeListener.Instance); + } +} + +public class LetterCountsProjection: SingleStreamProjection +{ + public override LetterCounts Evolve(LetterCounts snapshot, Guid id, IEvent e) + { + + switch (e.Data) + { + case AEvent _: + snapshot ??= new() { Id = id }; + snapshot.ACount++; + break; + + case BEvent _: + snapshot ??= new() { Id = id }; + snapshot.BCount++; + break; + + case CEvent _: + snapshot ??= new() { Id = id }; + snapshot.CCount++; + break; + + case DEvent _: + snapshot ??= new() { Id = id }; + snapshot.DCount++; + break; + } + + return snapshot; + } +} + + diff --git a/src/DocumentDbTests/SessionMechanics/ejecting_all_pending_changes.cs b/src/DocumentDbTests/SessionMechanics/ejecting_all_pending_changes.cs index d03100a7c3..23e3191ceb 100644 --- a/src/DocumentDbTests/SessionMechanics/ejecting_all_pending_changes.cs +++ b/src/DocumentDbTests/SessionMechanics/ejecting_all_pending_changes.cs @@ -29,15 +29,15 @@ public void will_clear_all_document_changes() #endregion } - public class AEvent{} - public class BEvent{} - public class CEvent{} + public class DBAEvent{} + public class DBBEvent{} + public class DBCEvent{} [Fact] public void will_clear_all_event_operations() { - theSession.Events.StartStream(Guid.NewGuid(), new AEvent(), new BEvent()); - theSession.Events.Append(Guid.NewGuid(), new CEvent()); + theSession.Events.StartStream(Guid.NewGuid(), new DBAEvent(), new DBBEvent()); + theSession.Events.Append(Guid.NewGuid(), new DBCEvent()); theSession.PendingChanges.Streams().Any().ShouldBeTrue(); diff --git a/src/EventSourcingTests/Aggregation/AggregationTestingSupport.cs b/src/EventSourcingTests/Aggregation/AggregationTestingSupport.cs index 9600527649..e29209c146 100644 --- a/src/EventSourcingTests/Aggregation/AggregationTestingSupport.cs +++ b/src/EventSourcingTests/Aggregation/AggregationTestingSupport.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using JasperFx.Core; using JasperFx.Events; using Marten.Events; @@ -54,6 +55,41 @@ public IEvent Add(T @event) } } +/// +/// Basically an ObjectMother for the A/B/C/D/Event types +/// +public static class LetterEvents +{ + public static IEnumerable ToLetterEvents(this string text) + { + foreach (var character in text.ToLowerInvariant()) + { + switch (character) + { + case 'a': + yield return new AEvent(); + break; + + case 'b': + yield return new BEvent(); + break; + + case 'c': + yield return new CEvent(); + break; + + case 'd': + yield return new DEvent(); + break; + + case 'e': + yield return new EEvent(); + break; + } + } + } +} + public class MyAggregate { diff --git a/src/Marten/Events/AsyncProjectionTestingExtensions.cs b/src/Marten/Events/AsyncProjectionTestingExtensions.cs index c0be77e7e9..ad5bfbe090 100644 --- a/src/Marten/Events/AsyncProjectionTestingExtensions.cs +++ b/src/Marten/Events/AsyncProjectionTestingExtensions.cs @@ -10,9 +10,13 @@ using JasperFx.Events.Daemon; using JasperFx.Events.Projections; using Marten.Events.Daemon; +using Marten.Events.Daemon.Coordination; using Marten.Services; using Marten.Storage; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; namespace Marten.Events; @@ -29,6 +33,17 @@ public static Task WaitForNonStaleProjectionDataAsync(this IHost host, TimeSpan return host.DocumentStore().WaitForNonStaleProjectionDataAsync(timeout); } + /// + /// Use with caution! This will try to wait for all projections to "catch up" to the currently + /// known farthest known sequence of the event store for the supplied "ancillary" store + /// + /// + /// + public static Task WaitForNonStaleProjectionDataAsync(this IHost host, TimeSpan timeout) where T : IDocumentStore + { + return host.DocumentStore().WaitForNonStaleProjectionDataAsync(timeout); + } + /// /// Wait for any running async daemon for a specific tenant id or database name to catch up to the latest event /// sequence at the time @@ -257,4 +272,131 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase $"The projections timed out before reaching the initial sequence of {highWaterMark}"); } } + + /// + /// Force any Marten async daemons to immediately advance to the latest changes. This is strictly + /// meant for test automation scenarios with small to medium sized databases + /// + /// + /// + /// Optionally control whether the projections and subscriptions should be restarted after they have caught up + /// + public static Task> ForceAllMartenDaemonActivityToCatchUpAsync(this IHost host, CancellationToken cancellation, + CatchUpMode mode = CatchUpMode.AndResumeNormally) + { + return host.Services.ForceAllMartenDaemonActivityToCatchUpAsync(cancellation, mode); + } + + /// + /// Force any Marten async daemons to immediately advance to the latest changes. This is strictly + /// meant for test automation scenarios with small to medium sized databases + /// + /// + /// + /// Optionally control whether the projections and subscriptions should be restarted after they have caught up + /// + public static async Task> ForceAllMartenDaemonActivityToCatchUpAsync(this IServiceProvider services, CancellationToken cancellation, + CatchUpMode mode = CatchUpMode.AndResumeNormally) + { + var logger = services.GetService>() ?? new NullLogger(); + var coordinator = services.GetRequiredService(); + var daemons = await coordinator.AllDaemonsAsync().ConfigureAwait(false); + + var list = new List(); + + foreach (var daemon in daemons) + { + try + { + await daemon.StopAllAsync().ConfigureAwait(false); + await daemon.CatchUpAsync(cancellation).ConfigureAwait(false); + + if (mode == CatchUpMode.AndResumeNormally) + { + await daemon.StartAllAsync().ConfigureAwait(false); + } + + logger.LogDebug("Executed a ProjectionDaemon.CatchUp() against {Daemon} in the main Marten store", daemon); + } + catch (Exception e) + { + logger.LogError(e, "Error trying to execute a CatchUp on {Daemon} in the main Marten store", daemon); + list.Add(e); + } + } + + return list; + } + + /// + /// Force any Marten async daemons for an ancillary Marten store to immediately advance to the latest changes. This is strictly + /// meant for test automation scenarios with small to medium sized databases + /// + /// + /// + /// Optionally control whether the projections and subscriptions should be restarted after they have caught up + /// + public static Task> ForceAllMartenDaemonActivityToCatchUpAsync(this IHost host, CancellationToken cancellation, + CatchUpMode mode = CatchUpMode.AndResumeNormally) where T : IDocumentStore + { + return host.Services.ForceAllMartenDaemonActivityToCatchUpAsync(cancellation, mode); + } + + /// + /// Force any Marten async daemons for an ancillary Marten store to immediately advance to the latest changes. This is strictly + /// meant for test automation scenarios with small to medium sized databases + /// + /// + /// + /// Optionally control whether the projections and subscriptions should be restarted after they have caught up + /// + public static async Task> ForceAllMartenDaemonActivityToCatchUpAsync(this IServiceProvider services, CancellationToken cancellation, + CatchUpMode mode = CatchUpMode.AndResumeNormally) where T : IDocumentStore + { + var logger = services.GetService>() ?? new NullLogger(); + var coordinator = services.GetRequiredService>(); + var daemons = await coordinator.AllDaemonsAsync().ConfigureAwait(false); + + var list = new List(); + + foreach (var daemon in daemons) + { + try + { + await daemon.StopAllAsync().ConfigureAwait(false); + await daemon.CatchUpAsync(cancellation).ConfigureAwait(false); + + if (mode == CatchUpMode.AndResumeNormally) + { + await daemon.StartAllAsync().ConfigureAwait(false); + } + + logger.LogDebug("Executed a ProjectionDaemon.CatchUp() against {Daemon} in Marten store {StoreType}", daemon, typeof(T).FullNameInCode()); + } + catch (Exception e) + { + logger.LogError(e, "Error trying to execute a CatchUp on {Daemon} in Marten store {StoreType}", daemon, typeof(T).FullNameInCode()); + list.Add(e); + } + } + + return list; + } } + + +public enum CatchUpMode +{ + /// + /// Default setting, in this case the projections and subscriptions will be restarted in normal operation + /// after the CatchUp operation is complete + /// + AndResumeNormally, + + /// + /// Do not resume the asynchronous projection or synchronous behavior after the CatchUp operation is complete + /// This may be useful for test automation + /// + AndDoNothing +} + diff --git a/src/Marten/Events/Daemon/Coordination/IProjectionCoordinator.cs b/src/Marten/Events/Daemon/Coordination/IProjectionCoordinator.cs index 9fcc03798a..1babc06ab8 100644 --- a/src/Marten/Events/Daemon/Coordination/IProjectionCoordinator.cs +++ b/src/Marten/Events/Daemon/Coordination/IProjectionCoordinator.cs @@ -1,17 +1,20 @@ -using System.Diagnostics.CodeAnalysis; +using System; +using System.Collections.Generic; using System.Threading.Tasks; +using JasperFx.Core.Reflection; using JasperFx.Events.Daemon; using Microsoft.Extensions.Hosting; namespace Marten.Events.Daemon.Coordination; -// TODO -- move this up in the namespace? public interface IProjectionCoordinator : IHostedService { // TODO -- add some convenience methods to get at various shards IProjectionDaemon DaemonForMainDatabase(); ValueTask DaemonForDatabase(string databaseIdentifier); + ValueTask> AllDaemonsAsync(); + /// /// Stops the projection coordinator's automatic restart logic and stops all running agents across all daemons. Does not release any held locks. /// diff --git a/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs b/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs index e00257a54f..eced4518d4 100644 --- a/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs +++ b/src/Marten/Events/Daemon/Coordination/ProjectionCoordinator.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -82,6 +83,12 @@ public async ValueTask DaemonForDatabase(string databaseIdent return findDaemonForDatabase(database); } + public async ValueTask> AllDaemonsAsync() + { + var all = await Store.Storage.AllDatabases().ConfigureAwait(false); + return all.OfType().Select(findDaemonForDatabase).ToList(); + } + public Task StartAsync(CancellationToken cancellationToken) { _cancellation?.SafeDispose(); diff --git a/src/Marten/Events/Daemon/ProjectionDaemon.cs b/src/Marten/Events/Daemon/ProjectionDaemon.cs index 1eeef4024b..509db11ed1 100644 --- a/src/Marten/Events/Daemon/ProjectionDaemon.cs +++ b/src/Marten/Events/Daemon/ProjectionDaemon.cs @@ -24,4 +24,9 @@ public ProjectionDaemon(DocumentStore store, MartenDatabase database, ILogger lo : base(store, database, logger, detector, store.Options.Projections) { } + + public override string ToString() + { + return $"Daemon: {Database.DatabaseUri} ({Database.Identifier})"; + } } diff --git a/src/Marten/HostExtensions.cs b/src/Marten/HostExtensions.cs index b22b000a9e..5bc619cd33 100644 --- a/src/Marten/HostExtensions.cs +++ b/src/Marten/HostExtensions.cs @@ -23,6 +23,42 @@ public static Task PauseAllDaemonsAsync(this IHost host) return coordinator.PauseAsync(); } + /// + /// Testing helper to pause all projection daemons in the system and completely + /// disable the daemon projection assignments for an ancillary store + /// + /// + /// + public static Task PauseAllDaemonsAsync(this IHost host) where T : IDocumentStore + { + var coordinator = host.Services.GetRequiredService>(); + return coordinator.PauseAsync(); + } + + /// + /// Testing helper to pause all projection daemons in the system and completely + /// disable the daemon projection assignments + /// + /// + /// + public static Task PauseAllDaemonsAsync(this IServiceProvider services) + { + var coordinator = services.GetRequiredService(); + return coordinator.PauseAsync(); + } + + /// + /// Testing helper to pause all projection daemons in the system and completely + /// disable the daemon projection assignments for an ancillary store + /// + /// + /// + public static Task PauseAllDaemonsAsync(this IServiceProvider services) where T : IDocumentStore + { + var coordinator = services.GetRequiredService>(); + return coordinator.PauseAsync(); + } + /// /// Testing helper to resume all projection daemons in the system and restart /// the daemon projection assignments @@ -45,6 +81,16 @@ public static IDocumentStore DocumentStore(this IHost host) return host.Services.GetRequiredService(); } + /// + /// Retrieve the Marten document store for this IHost + /// + /// + /// + public static IDocumentStore DocumentStore(this IServiceProvider services) + { + return services.GetRequiredService(); + } + /// /// Retrieve the Marten document store for this IHost when working with multiple Marten databases /// @@ -56,6 +102,16 @@ public static T DocumentStore(this IHost host) where T : IDocumentStore return host.Services.GetRequiredService(); } + /// + /// Retrieve the Marten document store for this IHost when working with multiple Marten databases + /// + /// + /// + public static T DocumentStore(this IServiceProvider services) where T : IDocumentStore + { + return services.GetRequiredService(); + } + /// /// Override the main Marten DocumentStore and any registered "ancillary" stores that are using the /// Async Daemon to run in "Solo" mode for faster and probably more reliable automated testing @@ -104,7 +160,8 @@ public static async Task CleanAllMartenDataAsync(this IHost host) where T : I } /// - /// Call DocumentStore.ResetAllData() on the document store in this host + /// Call DocumentStore.ResetAllData() on the document store in this host. This also pauses, then + /// resumes all asynchronous projection and subscription processing /// /// public static async Task ResetAllMartenDataAsync(this IHost host) @@ -140,6 +197,72 @@ public static async Task ResetAllMartenDataAsync(this IHost host) where T : I var store = host.DocumentStore(); await store.Advanced.ResetAllData(CancellationToken.None).ConfigureAwait(false); + if (coordinator != null) + { + await coordinator.ResumeAsync().ConfigureAwait(false); + } + } + + /// + /// Clean off all Marten data in the default DocumentStore for this host + /// + /// + public static async Task CleanAllMartenDataAsync(this IServiceProvider services) + { + var store = services.DocumentStore(); + await store.Advanced.Clean.DeleteAllDocumentsAsync(CancellationToken.None).ConfigureAwait(false); + await store.Advanced.Clean.DeleteAllEventDataAsync(CancellationToken.None).ConfigureAwait(false); + } + + /// + /// Clean off all Marten data in the specified DocumentStore for this host when working with multiple Marten databases + /// + /// + /// + public static async Task CleanAllMartenDataAsync(this IServiceProvider services) where T : IDocumentStore + { + var store = services.DocumentStore(); + await store.Advanced.Clean.DeleteAllDocumentsAsync(CancellationToken.None).ConfigureAwait(false); + await store.Advanced.Clean.DeleteAllEventDataAsync(CancellationToken.None).ConfigureAwait(false); + } + + /// + /// Call DocumentStore.ResetAllData() on the document store in this host + /// + /// + public static async Task ResetAllMartenDataAsync(this IServiceProvider services) + { + var coordinator = services.GetService(); + if (coordinator != null) + { + await coordinator.PauseAsync().ConfigureAwait(false); + } + + var store = services.DocumentStore(); + await store.Advanced.ResetAllData(CancellationToken.None).ConfigureAwait(false); + + if (coordinator != null) + { + await coordinator.ResumeAsync().ConfigureAwait(false); + } + } + + /// + /// Call DocumentStore.ResetAllData() on the document store in this host when working with multiple Marten databases + /// + /// + /// + public static async Task ResetAllMartenDataAsync(this IServiceProvider services) where T : IDocumentStore + { + var coordinator = services.GetService>(); + if (coordinator != null) + { + await coordinator.PauseAsync().ConfigureAwait(false); + } + + var store = services.DocumentStore(); + await store.Advanced.ResetAllData(CancellationToken.None).ConfigureAwait(false); + if (coordinator != null) { await coordinator.ResumeAsync().ConfigureAwait(false); diff --git a/src/Marten/Marten.csproj b/src/Marten/Marten.csproj index 2bc7c5ed6c..8eb529e183 100644 --- a/src/Marten/Marten.csproj +++ b/src/Marten/Marten.csproj @@ -33,10 +33,9 @@ - - - + + diff --git a/src/MultiTenancyTests/DocumentStore_IMartenStorage_implementation.cs b/src/MultiTenancyTests/DocumentStore_IMartenStorage_implementation.cs index a016907f9c..d293be56a0 100644 --- a/src/MultiTenancyTests/DocumentStore_IMartenStorage_implementation.cs +++ b/src/MultiTenancyTests/DocumentStore_IMartenStorage_implementation.cs @@ -71,7 +71,7 @@ public async Task InitializeAsync() opts.RegisterDocumentType(); opts.RegisterDocumentType(); - opts.Events.AddEventType(typeof(AEvent)); + opts.Events.AddEventType(typeof(RandomEvent)); }); }).StartAsync(); @@ -157,4 +157,4 @@ public async Task all_databases_can_return() } } -public record AEvent{} +public record RandomEvent{} diff --git a/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs b/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs index f289fe06a1..86935fcb4f 100644 --- a/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs +++ b/src/MultiTenancyTests/marten_managed_tenant_id_partitioning.cs @@ -175,7 +175,7 @@ public async Task should_not_build_storage_for_live_aggregations() opts.Projections.LiveStreamAggregation(); }, true); - var streamId = theSession.Events.StartStream(new AEvent(), new BEvent()).Id; + var streamId = theSession.Events.StartStream(new RandomEvent(), new BEvent()).Id; await theSession.SaveChangesAsync(); var aggregate = theSession.Events.AggregateStreamAsync(streamId); @@ -373,7 +373,7 @@ public class SimpleAggregate : IRevisioned public int DCount { get; set; } public int ECount { get; set; } - public void Apply(AEvent _) + public void Apply(RandomEvent _) { ACount++; }