From 1e52aa6640af9821f6ca8354b24b5043aa4fc091 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 4 Nov 2025 14:03:26 -0600 Subject: [PATCH] Multi-Stage Tracked Sessions Closes GH-1809 using latest Marten & JasperFx.Events Added working code for the 2 stage testing w/ Marten!!!! Added necessary registrations to the TrackedSessionConfiguration --- .../MartenTests/TestHelpers/AppendLetters.cs | 3 + .../MartenTests/TestHelpers/AppendLetters2.cs | 3 + .../TestHelpers/AppendLetters2Handler.cs | 27 +++ .../TestHelpers/AppendLettersHandler.cs | 26 +++ .../MartenTests/TestHelpers/EEvent.cs | 3 + .../MartenTests/TestHelpers/ILetterStore.cs | 8 + .../MartenTests/TestHelpers/LetterCounts.cs | 14 ++ .../TestHelpers/LetterCountsProjection.cs | 42 ++++ .../MartenTests/TestHelpers/LetterEvents.cs | 38 ++++ .../catch_up_and_then_do_nothing.cs | 121 +++++++++++ .../TestHelpers/catch_up_then_restart.cs | 121 +++++++++++ ...ch_up_when_using_wolverine_distribution.cs | 121 +++++++++++ .../TestHelpers/reset_data_first.cs | 123 +++++++++++ .../TestHelpers/second_stage_waiting.cs | 191 ++++++++++++++++++ .../wait_for_non_stale_data_after.cs | 121 +++++++++++ .../Distribution/EventStoreAgents.cs | 39 ++++ .../WolverineProjectionCoordinator.cs | 5 + src/Persistence/Wolverine.Marten/Events.cs | 8 + .../Wolverine.Marten/TestingExtensions.cs | 135 +++++++++++++ .../Wolverine.Marten/Wolverine.Marten.csproj | 2 +- .../Tracking/TrackedSession.Execution.cs | 144 +++++++++++++ src/Wolverine/Tracking/TrackedSession.cs | 75 ++----- .../Tracking/TrackedSessionConfiguration.cs | 35 ++++ 23 files changed, 1343 insertions(+), 62 deletions(-) create mode 100644 src/Persistence/MartenTests/TestHelpers/AppendLetters.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/AppendLetters2.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/AppendLetters2Handler.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/AppendLettersHandler.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/EEvent.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/ILetterStore.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/LetterCounts.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/LetterCountsProjection.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/LetterEvents.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/catch_up_and_then_do_nothing.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/catch_up_then_restart.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/catch_up_when_using_wolverine_distribution.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/reset_data_first.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/second_stage_waiting.cs create mode 100644 src/Persistence/MartenTests/TestHelpers/wait_for_non_stale_data_after.cs create mode 100644 src/Persistence/Wolverine.Marten/TestingExtensions.cs create mode 100644 src/Wolverine/Tracking/TrackedSession.Execution.cs diff --git a/src/Persistence/MartenTests/TestHelpers/AppendLetters.cs b/src/Persistence/MartenTests/TestHelpers/AppendLetters.cs new file mode 100644 index 000000000..a2a8ca36d --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/AppendLetters.cs @@ -0,0 +1,3 @@ +namespace MartenTests.TestHelpers; + +public record AppendLetters(Guid Id, string[] Events); \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/AppendLetters2.cs b/src/Persistence/MartenTests/TestHelpers/AppendLetters2.cs new file mode 100644 index 000000000..85c75b48d --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/AppendLetters2.cs @@ -0,0 +1,3 @@ +namespace MartenTests.TestHelpers; + +public record AppendLetters2(Guid Id, string[] Events); \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/AppendLetters2Handler.cs b/src/Persistence/MartenTests/TestHelpers/AppendLetters2Handler.cs new file mode 100644 index 000000000..938bfe559 --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/AppendLetters2Handler.cs @@ -0,0 +1,27 @@ +using Wolverine; +using Wolverine.Marten; + +namespace MartenTests.TestHelpers; + +public static class AppendLetters2Handler +{ + [MartenStore(typeof(ILetterStore))] + public static (Events, OutgoingMessages) Handle( + AppendLetters2 command, + + [WriteAggregate(Required = false)] + LetterCounts aggregate) + { + switch (command.Events.Length) + { + case 0: + return ([], []); + + case 1: + return (new Events(command.Events[0].ToLetterEvents()), []); + + default: + return (new Events(command.Events[0].ToLetterEvents()), [new AppendLetters2(command.Id, command.Events.Skip(1).ToArray())]); + } + } +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/AppendLettersHandler.cs b/src/Persistence/MartenTests/TestHelpers/AppendLettersHandler.cs new file mode 100644 index 000000000..adb182973 --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/AppendLettersHandler.cs @@ -0,0 +1,26 @@ +using Wolverine; +using Wolverine.Marten; + +namespace MartenTests.TestHelpers; + +public static class AppendLettersHandler +{ + public static (Events, OutgoingMessages) Handle( + AppendLetters command, + + [WriteAggregate(Required = false)] + LetterCounts aggregate) + { + switch (command.Events.Length) + { + case 0: + return ([], []); + + case 1: + return (new Events(command.Events[0].ToLetterEvents()), []); + + default: + return (new Events(command.Events[0].ToLetterEvents()), [new AppendLetters(command.Id, command.Events.Skip(1).ToArray())]); + } + } +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/EEvent.cs b/src/Persistence/MartenTests/TestHelpers/EEvent.cs new file mode 100644 index 000000000..bd33a6205 --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/EEvent.cs @@ -0,0 +1,3 @@ +namespace MartenTests.TestHelpers; + +public record EEvent; \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/ILetterStore.cs b/src/Persistence/MartenTests/TestHelpers/ILetterStore.cs new file mode 100644 index 000000000..7791703b2 --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/ILetterStore.cs @@ -0,0 +1,8 @@ +using Marten; + +namespace MartenTests.TestHelpers; + +public interface ILetterStore : IDocumentStore +{ + +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/LetterCounts.cs b/src/Persistence/MartenTests/TestHelpers/LetterCounts.cs new file mode 100644 index 000000000..d50d2c0f8 --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/LetterCounts.cs @@ -0,0 +1,14 @@ +using JasperFx; + +namespace MartenTests.TestHelpers; + +public class LetterCounts: IRevisioned +{ + public Guid Id { get; set; } + public int ACount { get; set; } + public int BCount { get; set; } + public int CCount { get; set; } + public int DCount { get; set; } + public int ECount { get; set; } + public int Version { get; set; } +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/LetterCountsProjection.cs b/src/Persistence/MartenTests/TestHelpers/LetterCountsProjection.cs new file mode 100644 index 000000000..86d7e812c --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/LetterCountsProjection.cs @@ -0,0 +1,42 @@ +using JasperFx.Events; +using Marten.Events.Aggregation; +using MartenTests.AggregateHandlerWorkflow; + +namespace MartenTests.TestHelpers; + +public class LetterCountsProjection: SingleStreamProjection +{ + public override LetterCounts Evolve(LetterCounts snapshot, Guid id, IEvent e) + { + + switch (e.Data) + { + case AEvent _: + snapshot ??= new() { Id = id }; + snapshot.ACount++; + break; + + case BEvent _: + snapshot ??= new() { Id = id }; + snapshot.BCount++; + break; + + case CEvent _: + snapshot ??= new() { Id = id }; + snapshot.CCount++; + break; + + case DEvent _: + snapshot ??= new() { Id = id }; + snapshot.DCount++; + break; + + case EEvent _: + snapshot ??= new() { Id = id }; + snapshot.ECount++; + break; + } + + return snapshot; + } +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/LetterEvents.cs b/src/Persistence/MartenTests/TestHelpers/LetterEvents.cs new file mode 100644 index 000000000..0fbf2b0d2 --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/LetterEvents.cs @@ -0,0 +1,38 @@ +using MartenTests.AggregateHandlerWorkflow; + +namespace MartenTests.TestHelpers; + +/// +/// Basically an ObjectMother for the A/B/C/D/Event types +/// +public static class LetterEvents +{ + public static IEnumerable ToLetterEvents(this string text) + { + foreach (var character in text.ToLowerInvariant()) + { + switch (character) + { + case 'a': + yield return new AEvent(); + break; + + case 'b': + yield return new BEvent(); + break; + + case 'c': + yield return new CEvent(); + break; + + case 'd': + yield return new DEvent(); + break; + + case 'e': + yield return new EEvent(); + break; + } + } + } +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/catch_up_and_then_do_nothing.cs b/src/Persistence/MartenTests/TestHelpers/catch_up_and_then_do_nothing.cs new file mode 100644 index 000000000..5e09ceb6f --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/catch_up_and_then_do_nothing.cs @@ -0,0 +1,121 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.TestHelpers; + +public class catch_up_and_then_do_nothing : IAsyncLifetime +{ + private IHost _host; + + public async Task InitializeAsync() + { + + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters3"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).AddAsyncDaemon(DaemonMode.Solo).IntegrateWithWolverine(); + + opts.Services.AddMartenStore(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters4"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).IntegrateWithWolverine(); + + opts.Durability.Mode = DurabilityMode.Solo; + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + } + + [Fact] + public async Task with_main_store() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .PauseThenCatchUpOnMartenDaemonActivity(CatchUpMode.AndDoNothing) + .InvokeMessageAndWaitAsync(new AppendLetters(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + } + + [Fact] + public async Task with_ancillary_store() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .PauseThenCatchUpOnMartenDaemonActivity(CatchUpMode.AndDoNothing) + .InvokeMessageAndWaitAsync(new AppendLetters2(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + } +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/catch_up_then_restart.cs b/src/Persistence/MartenTests/TestHelpers/catch_up_then_restart.cs new file mode 100644 index 000000000..efdd28f86 --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/catch_up_then_restart.cs @@ -0,0 +1,121 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.TestHelpers; + +public class catch_up_then_restart : IAsyncLifetime +{ + private IHost _host; + + public async Task InitializeAsync() + { + + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters3"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).AddAsyncDaemon(DaemonMode.Solo).IntegrateWithWolverine(); + + opts.Services.AddMartenStore(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters4"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).IntegrateWithWolverine(); + + opts.Durability.Mode = DurabilityMode.Solo; + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + } + + [Fact] + public async Task with_main_store() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .PauseThenCatchUpOnMartenDaemonActivity(CatchUpMode.AndResumeNormally) + .InvokeMessageAndWaitAsync(new AppendLetters(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + } + + [Fact] + public async Task with_ancillary_store() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .PauseThenCatchUpOnMartenDaemonActivity(CatchUpMode.AndResumeNormally) + .InvokeMessageAndWaitAsync(new AppendLetters2(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + } +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/catch_up_when_using_wolverine_distribution.cs b/src/Persistence/MartenTests/TestHelpers/catch_up_when_using_wolverine_distribution.cs new file mode 100644 index 000000000..f8f5a38d6 --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/catch_up_when_using_wolverine_distribution.cs @@ -0,0 +1,121 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.TestHelpers; + +public class catch_up_when_using_wolverine_distribution : IAsyncLifetime +{ + private IHost _host; + + public async Task InitializeAsync() + { + + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters3"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).IntegrateWithWolverine(x => x.UseWolverineManagedEventSubscriptionDistribution = true); + + opts.Services.AddMartenStore(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters4"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).IntegrateWithWolverine(); + + opts.Durability.Mode = DurabilityMode.Solo; + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + } + + [Fact] + public async Task with_main_store() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .PauseThenCatchUpOnMartenDaemonActivity(CatchUpMode.AndDoNothing) + .InvokeMessageAndWaitAsync(new AppendLetters(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + } + + [Fact] + public async Task with_ancillary_store() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .PauseThenCatchUpOnMartenDaemonActivity(CatchUpMode.AndDoNothing) + .InvokeMessageAndWaitAsync(new AppendLetters2(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + } +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/reset_data_first.cs b/src/Persistence/MartenTests/TestHelpers/reset_data_first.cs new file mode 100644 index 000000000..24ee73a62 --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/reset_data_first.cs @@ -0,0 +1,123 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.TestHelpers; + +public class reset_data_first : IAsyncLifetime +{ + private IHost _host; + + public async Task InitializeAsync() + { + + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).AddAsyncDaemon(DaemonMode.Solo).IntegrateWithWolverine(); + + opts.Services.AddMartenStore(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters2"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).IntegrateWithWolverine(); + + opts.Durability.Mode = DurabilityMode.Solo; + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + } + + [Fact] + public async Task reset_all_data_upfront() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .InvokeMessageAndWaitAsync(new AppendLetters(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + } + + [Fact] + public async Task reset_all_data_upfront_to_ancillary_store() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .InvokeMessageAndWaitAsync(new AppendLetters2(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + } +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/second_stage_waiting.cs b/src/Persistence/MartenTests/TestHelpers/second_stage_waiting.cs new file mode 100644 index 000000000..14e6a82c6 --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/second_stage_waiting.cs @@ -0,0 +1,191 @@ +using System.Diagnostics; +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Events; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events; +using Marten.Events.Aggregation; +using MartenTests.AggregateHandlerWorkflow; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.TestHelpers; + +public class second_stage_waiting : IAsyncLifetime +{ + private IHost _host; + + public async Task InitializeAsync() + { + + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters3"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).AddAsyncDaemon(DaemonMode.Solo).IntegrateWithWolverine(); + + opts.Services.AddMartenStore(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters4"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).IntegrateWithWolverine(); + + opts.Durability.Mode = DurabilityMode.Solo; + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + } + + [Fact] + public async Task with_main_store() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .PauseThenCatchUpOnMartenDaemonActivity(CatchUpMode.AndDoNothing) + .InvokeMessageAndWaitAsync(new AppendLetters(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + + tracked.Executed.SingleMessage().Id.ShouldBe(id); + tracked.Executed.SingleMessage().Id.ShouldBe(id); + } + + [Fact] + public async Task with_ancillary_store() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .Timeout(20.Seconds()) + .PauseThenCatchUpOnMartenDaemonActivity() + .InvokeMessageAndWaitAsync(new AppendLetters2(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + + tracked.Executed.SingleMessage().Id.ShouldBe(id); + tracked.Executed.SingleMessage().Id.ShouldBe(id); + } +} + + +public record GotFive(Guid Id); + +public record GotFiveResponse(Guid Id); + +public static class GotFiveHandler +{ + public static GotFiveResponse Handle(GotFive message) => new GotFiveResponse(message.Id); + + public static void Handle(GotFiveResponse m) => Debug.WriteLine("Got five response for " + m.Id); +} + +public class LetterCountsProjectionWithSideEffects: SingleStreamProjection +{ + public override LetterCounts Evolve(LetterCounts snapshot, Guid id, IEvent e) + { + + switch (e.Data) + { + case AEvent _: + snapshot ??= new() { Id = id }; + snapshot.ACount++; + break; + + case BEvent _: + snapshot ??= new() { Id = id }; + snapshot.BCount++; + break; + + case CEvent _: + snapshot ??= new() { Id = id }; + snapshot.CCount++; + break; + + case DEvent _: + snapshot ??= new() { Id = id }; + snapshot.DCount++; + break; + + case EEvent _: + snapshot ??= new() { Id = id }; + snapshot.ECount++; + break; + } + + return snapshot; + } + + public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice slice) + { + if (slice.Snapshot.ECount >= 5) + { + slice.PublishMessage(new GotFive(slice.Snapshot.Id)); + } + + return new ValueTask(); + } +} \ No newline at end of file diff --git a/src/Persistence/MartenTests/TestHelpers/wait_for_non_stale_data_after.cs b/src/Persistence/MartenTests/TestHelpers/wait_for_non_stale_data_after.cs new file mode 100644 index 000000000..477314478 --- /dev/null +++ b/src/Persistence/MartenTests/TestHelpers/wait_for_non_stale_data_after.cs @@ -0,0 +1,121 @@ +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Events.Daemon; +using JasperFx.Events.Projections; +using Marten; +using Marten.Events; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine; +using Wolverine.Marten; +using Wolverine.Tracking; + +namespace MartenTests.TestHelpers; + +public class wait_for_non_stale_data_after : IAsyncLifetime +{ + private IHost _host; + + public async Task InitializeAsync() + { + + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Services.AddMarten(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters3"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).AddAsyncDaemon(DaemonMode.Solo).IntegrateWithWolverine(); + + opts.Services.AddMartenStore(m => + { + m.DisableNpgsqlLogging = true; + + m.Connection(Servers.PostgresConnectionString); + m.DatabaseSchemaName = "letters4"; + + m.Projections.Add(ProjectionLifecycle.Async); + }).IntegrateWithWolverine(); + + opts.Durability.Mode = DurabilityMode.Solo; + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + } + + [Fact] + public async Task with_main_store() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .WaitForNonStaleDaemonDataAfterExecution(5.Seconds()) + .InvokeMessageAndWaitAsync(new AppendLetters(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + } + + [Fact] + public async Task with_ancillary_store() + { + await _host.ResetAllMartenDataAsync(); + + var id = Guid.NewGuid(); + + // Setting up some other aggregates first + using var session = _host.DocumentStore().LightweightSession(); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + session.Events.StartStream("AABBCCDDEE".ToLetterEvents()); + await session.SaveChangesAsync(); + await _host.WaitForNonStaleProjectionDataAsync(5.Seconds()); + + (await session.Query().CountAsync()).ShouldBe(3); + + + var tracked = await _host.TrackActivity() + .ResetAllMartenDataFirst() + .WaitForNonStaleDaemonDataAfterExecution(5.Seconds()) + .InvokeMessageAndWaitAsync(new AppendLetters2(id, ["AAAACCCCBDEEE", "ABCDECCC", "BBBA", "DDDAE"])); + + // Proving that previous data was wiped out + + var all = await session.Query().ToListAsync(); + var counts = all.Single(); + counts.Id.ShouldBe(id); + + counts.ACount.ShouldBe(7); + counts.BCount.ShouldBe(5); + counts.CCount.ShouldBe(8); + } +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Distribution/EventStoreAgents.cs b/src/Persistence/Wolverine.Marten/Distribution/EventStoreAgents.cs index 563e003d8..ac6ccfa49 100644 --- a/src/Persistence/Wolverine.Marten/Distribution/EventStoreAgents.cs +++ b/src/Persistence/Wolverine.Marten/Distribution/EventStoreAgents.cs @@ -140,6 +140,14 @@ public async Task StartAllAsync(CancellationToken cancellationToken) await daemon.StartAllAsync(); } + + if (usage.Database.MainDatabase != null) + { + var id = new DatabaseId(usage.Database.MainDatabase.ServerName, usage.Database.MainDatabase.DatabaseName); + var daemon = await FindDaemonAsync(id); + + await daemon.StartAllAsync(); + } } public async Task StopAllAsync(CancellationToken cancellationToken) @@ -150,6 +158,35 @@ public async Task StopAllAsync(CancellationToken cancellationToken) await daemon.StopAllAsync(); } } + + public async ValueTask> AllDaemonsAsync() + { + var usage = await _store.TryCreateUsage(CancellationToken.None); + if (usage == null) + { + return []; + } + + var list = new List(); + + foreach (var database in usage.Database.Databases) + { + var id = new DatabaseId(database.ServerName, database.DatabaseName); + var daemon = await FindDaemonAsync(id); + + list.Add(daemon); + } + + if (usage.Database.MainDatabase != null && usage.Database.Cardinality == DatabaseCardinality.Single) + { + var id = new DatabaseId(usage.Database.MainDatabase.ServerName, usage.Database.MainDatabase.DatabaseName); + var daemon = await FindDaemonAsync(id); + + list.Add(daemon); + } + + return list; + } public IProjectionDaemon DaemonForMainDatabase() { @@ -160,4 +197,6 @@ public ValueTask DaemonForDatabase(string databaseIdentifier) { throw new NotSupportedException("This method is not supported with the Wolverine managed projection/subscription distribution"); } + + } \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Distribution/WolverineProjectionCoordinator.cs b/src/Persistence/Wolverine.Marten/Distribution/WolverineProjectionCoordinator.cs index c48c39225..56acc0b83 100644 --- a/src/Persistence/Wolverine.Marten/Distribution/WolverineProjectionCoordinator.cs +++ b/src/Persistence/Wolverine.Marten/Distribution/WolverineProjectionCoordinator.cs @@ -46,6 +46,11 @@ public ValueTask DaemonForDatabase(string databaseIdentifier) return _storeAgents.DaemonForDatabase(databaseIdentifier); } + public ValueTask> AllDaemonsAsync() + { + return _storeAgents.AllDaemonsAsync(); + } + public Task PauseAsync() { return StopAsync(CancellationToken.None); diff --git a/src/Persistence/Wolverine.Marten/Events.cs b/src/Persistence/Wolverine.Marten/Events.cs index a95995e9a..cb76247ba 100644 --- a/src/Persistence/Wolverine.Marten/Events.cs +++ b/src/Persistence/Wolverine.Marten/Events.cs @@ -8,6 +8,14 @@ namespace Wolverine.Marten; /// public class Events : List, IWolverineReturnType { + public Events() + { + } + + public Events(IEnumerable collection) : base(collection) + { + } + public static Events operator +(Events events, object @event) { events.Add(@event); diff --git a/src/Persistence/Wolverine.Marten/TestingExtensions.cs b/src/Persistence/Wolverine.Marten/TestingExtensions.cs new file mode 100644 index 000000000..93065c7f9 --- /dev/null +++ b/src/Persistence/Wolverine.Marten/TestingExtensions.cs @@ -0,0 +1,135 @@ +using Marten; +using Marten.Events; +using Wolverine.Tracking; + +namespace Wolverine.Marten; + +public static class TestingExtensions +{ + /// + /// Reset all data in the main Marten store before running the execution + /// + /// + /// + public static TrackedSessionConfiguration ResetAllMartenDataFirst(this TrackedSessionConfiguration configuration) + { + return configuration.BeforeExecution(async (runtime, cancellation) => + { + await runtime.Services.ResetAllMartenDataAsync(); + }); + } + + /// + /// Reset all data in an ancillary Marten store before running the execution + /// + /// + /// + /// + public static TrackedSessionConfiguration ResetAllMartenDataFirst(this TrackedSessionConfiguration configuration) + where T : IDocumentStore + { + return configuration.BeforeExecution(async (runtime, cancellation) => + { + await runtime.Services.ResetAllMartenDataAsync(); + }); + } + + /// + /// Force any Marten projection or subscriptions normally running asynchronously + /// to "catch up" immediately after running the main execution for the main + /// Marten store + /// + /// + /// + /// + public static TrackedSessionConfiguration PauseThenCatchUpOnMartenDaemonActivity(this TrackedSessionConfiguration configuration, CatchUpMode mode = CatchUpMode.AndResumeNormally) + { + configuration.BeforeExecution(async (runtime, cancellation) => + { + await runtime.Services.PauseAllDaemonsAsync(); + }); + + return configuration.AddStage(async (runtime, _, cancellation) => + { + var exceptions = await runtime.Services.ForceAllMartenDaemonActivityToCatchUpAsync(cancellation, mode); + foreach (var e in exceptions) + { + runtime.MessageTracking.LogException(e); + } + }); + } + + /// + /// Force any Marten projection or subscriptions normally running asynchronously + /// to "catch up" immediately after running the main execution for an ancillary + /// Marten store + /// + /// + /// + /// + /// + public static TrackedSessionConfiguration PauseThenCatchUpOnMartenDaemonActivity(this TrackedSessionConfiguration configuration, CatchUpMode mode = CatchUpMode.AndResumeNormally) + where T : IDocumentStore + { + configuration.BeforeExecution(async (runtime, cancellation) => + { + await runtime.Services.PauseAllDaemonsAsync(); + }); + + return configuration.AddStage(async (runtime, _, cancellation) => + { + var exceptions = await runtime.Services.ForceAllMartenDaemonActivityToCatchUpAsync(cancellation, mode); + foreach (var exception in exceptions) + { + runtime.MessageTracking.LogException(exception); + } + }); + } + + /// + /// Wait for any ongoing Marten asynchronous projection or subscription activity to finish + /// after the main execution + /// + /// + /// + /// + public static TrackedSessionConfiguration WaitForNonStaleDaemonDataAfterExecution( + this TrackedSessionConfiguration configuration, TimeSpan timeout) + { + return configuration.AfterExecution(async (r, _) => + { + try + { + await r.Services.DocumentStore().WaitForNonStaleProjectionDataAsync(timeout); + } + catch (Exception e) + { + r.MessageTracking.LogException(e); + } + }); + } + + /// + /// Wait for any ongoing Marten asynchronous projection or subscription activity to finish + /// after the main execution for an ancillary Marten store + /// + /// + /// + /// + /// + public static TrackedSessionConfiguration WaitForNonStaleDaemonDataAfterExecution( + this TrackedSessionConfiguration configuration, TimeSpan timeout) where T : IDocumentStore + { + return configuration.AfterExecution(async (r, _) => + { + try + { + await r.Services.DocumentStore().WaitForNonStaleProjectionDataAsync(timeout); + } + catch (Exception e) + { + r.MessageTracking.LogException(e); + } + }); + } +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj index 8efeff42c..11ba02ffd 100644 --- a/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj +++ b/src/Persistence/Wolverine.Marten/Wolverine.Marten.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/Wolverine/Tracking/TrackedSession.Execution.cs b/src/Wolverine/Tracking/TrackedSession.Execution.cs new file mode 100644 index 000000000..083899e9b --- /dev/null +++ b/src/Wolverine/Tracking/TrackedSession.Execution.cs @@ -0,0 +1,144 @@ +using JasperFx.Core; +using Microsoft.Extensions.DependencyInjection; +using Wolverine.Runtime; + +namespace Wolverine.Tracking; + +internal partial class TrackedSession +{ + internal TrackedSession(TrackedSession parent, + Func func) + { + _stopwatch = parent._stopwatch; + _primaryHost = parent._primaryHost; + _otherHosts.AddRange(parent._otherHosts); + _primaryLogger = parent._primaryLogger; + _ignoreMessageRules.AddRange(parent._ignoreMessageRules); + _source = new(); + + AssertAnyFailureAcknowledgements = parent.AssertAnyFailureAcknowledgements; + AssertNoExceptions = false; + AlwaysTrackExternalTransports = parent.AlwaysTrackExternalTransports; + + Execution = c => func(_primaryLogger, c, _cancellation.Token); + } + + internal async Task executeStageAsync(Func execution) + { + await using var scope = _primaryHost.Services.CreateAsyncScope(); + var context = scope.ServiceProvider.GetRequiredService(); + await execution(context).WaitAsync(Timeout); + } + + public async Task ExecuteAndTrackAsync() + { + setActiveSession(this); + + foreach (var before in Befores) + { + await before(_primaryLogger, _cancellation.Token); + } + + _stopwatch.Start(); + + try + { + await executeStageAsync(Execution); + _executionComplete = true; + } + catch (TimeoutException e) + { + cleanUp(); + + var message = + BuildActivityMessage($"This {nameof(TrackedSession)} timed out before all activity completed."); + + throw new TimeoutException(message, e); + } + catch (Exception) + { + cleanUp(); + throw; + } + + // This is for race conditions if the activity manages to finish really fast + if (IsCompleted()) + { + Status = TrackingStatus.Completed; + } + else + { + startTimeoutTracking(); + await _source.Task; + } + + while (SecondaryStages.Any()) + { + var child = SecondaryStages.Dequeue(); + await child.ExecuteAsync(); + } + + cleanUp(); + + if (AssertNoExceptions) + { + AssertNoExceptionsWereThrown(); + } + + if (AssertAnyFailureAcknowledgements) + { + AssertNoFailureAcksWereSent(); + } + + if (AssertNoExceptions) + { + AssertNotTimedOut(); + } + } + + internal interface ISecondStateExecution + { + Task ExecuteAsync(); + } + + internal class SecondaryAction : ISecondStateExecution + { + private readonly TrackedSession _parent; + private readonly Func _func; + + public SecondaryAction(TrackedSession parent, Func func) + { + _parent = parent; + _func = func; + } + + public async Task ExecuteAsync() + { + await _func(_parent._primaryLogger, _parent._cancellation.Token); + } + } + + internal class SecondaryStage : ISecondStateExecution + { + private readonly TrackedSession _parent; + private readonly Func _func; + + public SecondaryStage(TrackedSession parent, Func func) + { + _parent = parent; + _func = func; + } + + public async Task ExecuteAsync() + { + var child = new TrackedSession(_parent, _func); + await child.ExecuteAndTrackAsync(); + + // Copy in children data + foreach (var envelopeHistory in child._envelopes) + { + _parent._envelopes[envelopeHistory.EnvelopeId] = envelopeHistory; + } + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Tracking/TrackedSession.cs b/src/Wolverine/Tracking/TrackedSession.cs index 6add5dc22..cc445ef9e 100644 --- a/src/Wolverine/Tracking/TrackedSession.cs +++ b/src/Wolverine/Tracking/TrackedSession.cs @@ -12,11 +12,11 @@ namespace Wolverine.Tracking; -internal class TrackedSession : ITrackedSession +internal partial class TrackedSession : ITrackedSession { private readonly IList _conditions = new List(); - private readonly Cache _envelopes = new(id => new EnvelopeHistory(id)); + private Cache _envelopes = new(id => new EnvelopeHistory(id)); private readonly IList _exceptions = new List(); @@ -28,9 +28,10 @@ internal class TrackedSession : ITrackedSession private bool _executionComplete; - private readonly Stopwatch _stopwatch = new(); + private Stopwatch _stopwatch = new(); private readonly List> _ignoreMessageRules = [t => t.CanBeCastTo()]; + private CancellationTokenSource _cancellation = new(); private TrackingStatus _status = TrackingStatus.Active; @@ -46,6 +47,13 @@ public TrackedSession(IServiceProvider services) : this(new HostWrapper(services } + // Actions to carry out first before execute and track + public List> Befores { get; } = new(); + + // All previous TrackedSessions + public List Previous { get; } = new(); + public Queue SecondaryStages { get; } = new(); + public TimeSpan Timeout { get; set; } = 5.Seconds(); public bool AssertNoExceptions { get; set; } = true; @@ -319,64 +327,7 @@ private void setActiveSession(TrackedSession? session) _primaryLogger.ActiveSession = session; foreach (var runtime in _otherHosts) runtime.ActiveSession = session; } - - public async Task ExecuteAndTrackAsync() - { - setActiveSession(this); - - _stopwatch.Start(); - - try - { - await using var scope = _primaryHost.Services.CreateAsyncScope(); - var context = scope.ServiceProvider.GetRequiredService(); - await Execution(context).WaitAsync(Timeout); - _executionComplete = true; - } - catch (TimeoutException e) - { - cleanUp(); - - var message = - BuildActivityMessage($"This {nameof(TrackedSession)} timed out before all activity completed."); - - throw new TimeoutException(message, e); - } - catch (Exception) - { - cleanUp(); - throw; - } - - // This is for race conditions if the activity manages to finish really fast - if (IsCompleted()) - { - Status = TrackingStatus.Completed; - } - else - { - startTimeoutTracking(); - await _source.Task; - } - - cleanUp(); - - if (AssertNoExceptions) - { - AssertNoExceptionsWereThrown(); - } - - if (AssertAnyFailureAcknowledgements) - { - AssertNoFailureAcksWereSent(); - } - - if (AssertNoExceptions) - { - AssertNotTimedOut(); - } - } - + public void AssertNoFailureAcksWereSent() { var records = AllRecordsInOrder().Where(x => x.Message is FailureAcknowledgement).ToArray(); @@ -418,6 +369,8 @@ private void startTimeoutTracking() await Task.Delay(Timeout); Status = TrackingStatus.TimedOut; + + await _cancellation.CancelAsync(); }, CancellationToken.None, TaskCreationOptions.RunContinuationsAsynchronously, TaskScheduler.Default); } diff --git a/src/Wolverine/Tracking/TrackedSessionConfiguration.cs b/src/Wolverine/Tracking/TrackedSessionConfiguration.cs index e72685ba5..a080c1a99 100644 --- a/src/Wolverine/Tracking/TrackedSessionConfiguration.cs +++ b/src/Wolverine/Tracking/TrackedSessionConfiguration.cs @@ -37,6 +37,41 @@ public TrackedSessionConfiguration IgnoreMessageType() return IgnoreMessagesMatchingType(t => t == typeof(T)); } + /// + /// Register an action that gets called before the actual execution + /// Typically meant for clean up work + /// + /// + /// + public TrackedSessionConfiguration BeforeExecution(Func before) + { + Session.Befores.Add(before); + return this; + } + + /// + /// Add a secondary stage of the tracked session that will execute like a nested tracked session + /// + /// + /// + public TrackedSessionConfiguration AddStage(Func stage) + { + Session.SecondaryStages.Enqueue(new TrackedSession.SecondaryStage(Session, stage)); + return this; + } + + /// + /// Add a secondary action against the current Wolverine application *after* the tracked session + /// has completed + /// + /// + /// + public TrackedSessionConfiguration AfterExecution(Func func) + { + Session.SecondaryStages.Enqueue(new TrackedSession.SecondaryAction(Session, func)); + return this; + } + /// /// Do not track any messages where the message type matches this filter. /// Helpful for polling operations that maybe happening during your testing