diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 966ea72e8c..fe42d6ef5c 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -206,6 +206,7 @@ const config: UserConfig = { text: 'Removing Protected Information', link: '/events/protection' }, + {text: 'Marking Events as Skipped', link: '/events/skipping'}, { text: 'Aggregates, events and repositories', link: '/scenarios/aggregates-events-repositories' diff --git a/docs/events/projections/async-daemon.md b/docs/events/projections/async-daemon.md index 60f49081c3..b1b2413cab 100644 --- a/docs/events/projections/async-daemon.md +++ b/docs/events/projections/async-daemon.md @@ -442,12 +442,14 @@ or subscription with the prefix: `marten.{Subscription or Projection Name}.{shar * `loading` -- traces the loading of a page of events for a projection or subscription. Same tags as above * `grouping` -- traces the grouping process for projections that happens prior to execution. This does not apply to subscriptions. Same tags as above -In addition, there are two metrics built for every combination of projection or subscription shard on each +In addition, there are three metrics built for every combination of projection or subscription shard on each Marten database (in the case of using separate databases for multi-tenancy), again using the same prefix as above with the addition of the Marten database identifier in the case of multi-tenancy through separate databases like `marten.{database name}.{projection or subscription name}.all.*: * `processed` - a counter giving you an indication of how many events are being processed by the currently running subscription or projection shard * `gap` - a histogram telling you the "gap" between the high water mark of the system and the furthest progression of the running subscription or projection. +* `skipped` - added in Marten 8.6, a counter telling you how many events were skipped during asynchronous projection or subscription processing. Depending on how the application is + configured, Marten may skip events because of serialization errors, unknown events, or application errors (basically, *your* code threw an exception) ::: tip The `gap` metrics are a good health check on the performance of any given projection or subscription. If this gap @@ -481,6 +483,21 @@ using multi-tenancy through a database per tenant. On these spans will be these There is also a counter metric called `marten.daemon.skipping` or `marten.[database name].daemon.skipping` that just emits and update every time that Marten has to "skip" stale events. +## Advanced Skipping Tracking + +::: info +This setting will be required and utilized by the forthcoming "CritterWatch" tool. +::: + +As part of some longer term planned improvements for Marten projection/subscription monitoring and potential +administrative "healing" functions, you can opt into having Marten write out an additional table +called `mt_high_water_skips` that tracks every time the high water detection has to "skip" over stale data. You can +use this information to "know" what streams and projections may be impacted by a skip. + +The flag for this is shown below: + +snippet: sample_enabling_advanced_tracking + ## Querying for Non Stale Data There are some potential benefits to running projections asynchronously, namely: diff --git a/docs/events/skipping.md b/docs/events/skipping.md new file mode 100644 index 0000000000..2e9331c2cf --- /dev/null +++ b/docs/events/skipping.md @@ -0,0 +1,32 @@ +# Marking Events as Skipped + +What if your code happens to append an event that turns out to be completely erroneous (not necessarily because of your code) and you wish you could +retroactively have it removed from the event store? You _could_ go and delete the event record directly from the `mt_events` +table, but maybe you have regulatory requirements that no events can ever be deleted -- which is the real world case that +spawned this feature. You _could_ also try a compensating event that effectively reverses the impact of the earlier, now +invalid event, but that requires more work and foresight on your part. + +Instead, you can mark events in a Marten event store as "skipped" such that these events are left as is in the database, +but will no longer be applied: + +1. In projections. You'd have to rebuild a projection that includes a skipped event to update the resulting projection though. +2. Subscriptions. If you rewind a subscription and replay it, the events marked as "skipped" are, well, skipped +3. `AggregateStreamAsync()` in all usages +4. `FetchLatest()` and `FetchForWriting()` usages, but again, you may have to rebuild a projection to take the skipped events out of the results + +::: tip +Definitely check out [Rebuilding a Single Stream](/events/projections/rebuilding.html#rebuilding-a-single-stream) for part of the recipe for "healing" a system from bad events. +::: + +To get started, you will first have to enable potential event skipping like this: + +snippet: sample_enabling_event_skipping + +That flag just enables the ability to mark events as _skipped_. As you'd imagine, that +flag alters Marten behavior by: + +1. Adds a new field called `is_skipped` to your `mt_events` table +2. Adds an additional filter on `is_skipped = FALSE` on many event store operations detailed above + +To mark events as skipped, you can either use raw SQL against your `mt_events` table, or +this helper API: diff --git a/src/DaemonTests/Internals/HighWaterDetectorTests.cs b/src/DaemonTests/Internals/detecting_the_high_water_mark.cs similarity index 80% rename from src/DaemonTests/Internals/HighWaterDetectorTests.cs rename to src/DaemonTests/Internals/detecting_the_high_water_mark.cs index a1c800342b..3a5c515271 100644 --- a/src/DaemonTests/Internals/HighWaterDetectorTests.cs +++ b/src/DaemonTests/Internals/detecting_the_high_water_mark.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; using DaemonTests.TestingSupport; @@ -19,12 +20,20 @@ namespace DaemonTests.Internals; public class HighWaterDetectorTests: DaemonContext { - private readonly HighWaterDetector theDetector; + private HighWaterDetector _detector; public HighWaterDetectorTests(ITestOutputHelper output) : base(output) { theStore.EnsureStorageExists(typeof(IEvent)); - theDetector = new HighWaterDetector((MartenDatabase)theStore.Tenancy.Default.Database, theStore.Events, NullLogger.Instance); + } + + internal HighWaterDetector theDetector + { + get + { + _detector ??= new HighWaterDetector((MartenDatabase)theStore.Tenancy.Default.Database, theStore.Events, NullLogger.Instance); + return _detector; + } } [Fact] @@ -66,9 +75,14 @@ public async Task starting_from_first_detection_some_gaps_with_zero_buffer() statistics.HighestSequence.ShouldBe(NumberOfEvents); } - [Fact] - public async Task second_run_detect_same_gap_when_stale() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task second_run_detect_same_gap_when_stale(bool useAdvancedTracking) { + StoreOptions(opts => opts.Events.EnableAdvancedAsyncTracking = useAdvancedTracking); + await theStore.EnsureStorageExistsAsync(typeof(IEvent)); + NumberOfStreams = 10; await PublishSingleThreaded(); @@ -82,9 +96,14 @@ public async Task second_run_detect_same_gap_when_stale() statistics.CurrentMark.ShouldBe(NumberOfEvents - 101); } - [Fact] - public async Task starting_from_first_detection_some_gaps_with_nonzero_buffer() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task starting_from_first_detection_some_gaps_with_nonzero_buffer(bool useAdvancedTracking) { + StoreOptions(opts => opts.Events.EnableAdvancedAsyncTracking = useAdvancedTracking); + await theStore.EnsureStorageExistsAsync(typeof(IEvent)); + NumberOfStreams = 10; await PublishSingleThreaded(); @@ -104,9 +123,14 @@ public async Task starting_from_first_detection_some_gaps_with_nonzero_buffer() statistics2.CurrentMark.ShouldBe(NumberOfEvents - 96); } - [Fact] - public async Task look_for_safe_harbor_time_if_there_are_gaps_between_highest_assigned_event_and_the_sequence() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task look_for_safe_harbor_time_if_there_are_gaps_between_highest_assigned_event_and_the_sequence(bool useAdvancedTracking) { + StoreOptions(opts => opts.Events.EnableAdvancedAsyncTracking = useAdvancedTracking); + await theStore.EnsureStorageExistsAsync(typeof(IEvent)); + NumberOfStreams = 10; await PublishSingleThreaded(); @@ -129,6 +153,12 @@ public async Task look_for_safe_harbor_time_if_there_are_gaps_between_highest_as // 20 + 20 - 32 = 8 statistics3.CurrentMark.ShouldBe(statistics.CurrentMark + 8); + + if (useAdvancedTracking) + { + var skips = await theDetector.FetchLastProgressionSkipsAsync(100, CancellationToken.None); + skips.Any().ShouldBeTrue(); + } } diff --git a/src/DaemonTests/Internals/fetching_events.cs b/src/DaemonTests/Internals/fetching_events.cs index b3c371bd13..ec433fd487 100644 --- a/src/DaemonTests/Internals/fetching_events.cs +++ b/src/DaemonTests/Internals/fetching_events.cs @@ -11,6 +11,7 @@ using Marten.Events.Daemon.Internals; using Marten.Storage; using Marten.Testing.Harness; +using NSubstitute; using Shouldly; using Weasel.Postgresql.SqlGeneration; using Xunit; @@ -25,7 +26,7 @@ public class fetching_events: OneOffConfigurationsContext, IAsyncLifetime public fetching_events() { - theRange = new EventRange(theShardName, 0, 100); + theRange = new EventRange(theShardName, 0, 100, Substitute.For()); } public Task InitializeAsync() diff --git a/src/DaemonTests/Internals/projection_progression_operations.cs b/src/DaemonTests/Internals/projection_progression_operations.cs index 4e94ffb6c5..5820e9fc2f 100644 --- a/src/DaemonTests/Internals/projection_progression_operations.cs +++ b/src/DaemonTests/Internals/projection_progression_operations.cs @@ -8,6 +8,7 @@ using Marten.Testing; using Marten.Testing.Documents; using Marten.Testing.Harness; +using NSubstitute; using Shouldly; using Xunit; @@ -58,7 +59,7 @@ public async Task update_happy_path() await theSession.SaveChangesAsync(); var updateProjectionProgress = - new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("three"), 12, 50)); + new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("three"), 12, 50, Substitute.For())); theSession.QueueOperation(updateProjectionProgress); await theSession.SaveChangesAsync(); @@ -83,7 +84,7 @@ public async Task Bug_2201_update_successfully_but_have_deletion_next() await theSession.SaveChangesAsync(); var updateProjectionProgress = - new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("three"), 12, 50)); + new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("three"), 12, 50, Substitute.For())); theSession.QueueOperation(updateProjectionProgress); theSession.Delete(target); @@ -102,7 +103,7 @@ public async Task update_sad_path() theSession.QueueOperation(insertProjectionProgress); await theSession.SaveChangesAsync(); - var updateProjectionProgress = new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("four"), 5, 50)); + var updateProjectionProgress = new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("four"), 5, 50, Substitute.For())); var ex = await Should.ThrowAsync(async () => { diff --git a/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs b/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs index 26f8d69d7e..2f7e304ab4 100644 --- a/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs +++ b/src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs @@ -14,6 +14,8 @@ using Marten.Schema; using Marten.Testing.Harness; using Marten.Util; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; using Npgsql; using Shouldly; using Xunit; @@ -144,6 +146,144 @@ public async Task see_the_dead_letter_events() } } +public class when_skipping_events_in_daemon_with_advanced_tracking : DaemonContext +{ + private readonly string[] theNames = + { + "Jane", + "Jill", + "Jack", + "JohnBad", + "JakeBad", + "JillBad", + "JohnBad", + "Derrick", + "Daniel", + "Donald", + "DonBad", + "Bob", + "Beck", + "BadName", + "Jeremy" + }; + + public when_skipping_events_in_daemon_with_advanced_tracking(ITestOutputHelper output) : base(output) + { + StoreOptions(opts => + { + opts.Events.DatabaseSchemaName = "daemon"; + opts.Projections.Add(ProjectionLifecycle.Async); + opts.Projections.Add(ProjectionLifecycle.Async); + + opts.Projections.RebuildErrors.SkipApplyErrors = true; + opts.Projections.Errors.SkipApplyErrors = true; + + opts.Events.EnableAdvancedAsyncTracking = true; + }); + } + + public static void enable_advanced_tracking_in_bootstrapping() + { + #region sample_enabling_advanced_tracking + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection(builder.Configuration.GetConnectionString("marten")); + + opts.Events.EnableAdvancedAsyncTracking = true; + }); + + #endregion + } + + private async Task PublishTheEvents() + { + var daemon = await StartDaemon(); + + var waiter1 = daemon.Tracker.WaitForShardState("CollateNames:All", theNames.Length); + var waiter2 = daemon.Tracker.WaitForShardState("NamedDocuments:All", theNames.Length, 5.Minutes()); + + + var events = theNames.Select(name => new NameEvent {Name = name}) + .OfType() + .ToArray(); + + theSession.Events.StartStream(Guid.NewGuid(), events); + await theSession.SaveChangesAsync(); + + await daemon.Tracker.WaitForHighWaterMark(theNames.Length); + + await waiter1; + await waiter2; + + return daemon; + } + + [Fact] + public async Task the_shards_should_still_be_running() + { + var daemon = await PublishTheEvents(); + + var shards = daemon.CurrentAgents(); + + foreach (var shard in shards) + { + shard.Status.ShouldBe(AgentStatus.Running); + } + } + + [Fact] + public async Task skip_bad_events_in_event_projection() + { + await PublishTheEvents(); + + var names = await theSession.Query() + .OrderBy(x => x.Name) + .Select(x => x.Name) + .ToListAsync(); + + var expected = theNames + .OrderBy(x => x) + .Where(x => !x.Contains("bad", StringComparison.OrdinalIgnoreCase)) + .ToArray(); + + names.ShouldHaveTheSameElementsAs(expected); + } + + [Fact] + public async Task skip_bad_events_in_aggregate_projection() + { + await PublishTheEvents(); + + var jNames = await theSession.LoadAsync("J"); + + jNames.Names.OrderBy(x => x) + .ShouldHaveTheSameElementsAs("Jack", "Jane", "Jeremy", "Jill"); + } + + [Fact] + public async Task see_the_dead_letter_events() + { + var daemon = await PublishTheEvents(); + + // Drain the dead letter events queued up + await daemon.StopAllAsync(); + + theSession.Logger = new TestOutputMartenLogger(_output); + var skipped = await theSession.Query().ToListAsync(); + + skipped.Where(x => x.ProjectionName == "CollateNames" && x.ShardName == "All") + .Select(x => x.EventSequence).OrderBy(x => x) + .ShouldHaveTheSameElementsAs(4, 5, 6, 7); + + skipped.Where(x => x.ProjectionName == "NamedDocuments" && x.ShardName == "All") + .Select(x => x.EventSequence).OrderBy(x => x) + .ShouldHaveTheSameElementsAs(4, 5, 6, 7, 11, 14); + + } +} + public class ErrorRejectingEventProjection: EventProjection { public ErrorRejectingEventProjection() diff --git a/src/EventSourcingTests/EventGraphTests.cs b/src/EventSourcingTests/EventGraphTests.cs index 960e2fe8c2..5cd2bccc9d 100644 --- a/src/EventSourcingTests/EventGraphTests.cs +++ b/src/EventSourcingTests/EventGraphTests.cs @@ -195,6 +195,12 @@ public void add_event_directly() e.EventTypeName.ShouldNotBeNull(); } + [Fact] + public void enable_event_skipping_should_be_disabled_by_default() + { + theGraph.EnableEventSkippingInProjectionsOrSubscriptions.ShouldBeFalse(); + } + public class HouseRemodeling { public Guid Id { get; set; } diff --git a/src/EventSourcingTests/EventSourcingTests.csproj b/src/EventSourcingTests/EventSourcingTests.csproj index 58e18f9ba6..a96ac377f6 100644 --- a/src/EventSourcingTests/EventSourcingTests.csproj +++ b/src/EventSourcingTests/EventSourcingTests.csproj @@ -1,9 +1,4 @@ - - true - false - - diff --git a/src/EventSourcingTests/Examples/EventSkipping.cs b/src/EventSourcingTests/Examples/EventSkipping.cs new file mode 100644 index 0000000000..37e5e8179c --- /dev/null +++ b/src/EventSourcingTests/Examples/EventSkipping.cs @@ -0,0 +1,51 @@ +using System.Threading; +using System.Threading.Tasks; +using Marten; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; + + +namespace EventSourcingTests.Examples; + +public class EventSkipping +{ + public static void enable_event_skipping() + { + #region sample_enabling_event_skipping + + var builder = Host.CreateApplicationBuilder(); + builder.Services.AddMarten(opts => + { + opts.Connection(builder.Configuration.GetConnectionString("marten")); + + // This is false by default for backwards compatibility, + // turning this on will add an extra column and filtering during + // various event store operations + opts.Events.EnableEventSkippingInProjectionsOrSubscriptions = true; + }); + + #endregion + } + + public static async Task mark_events_as_skipped( + IDocumentStore store, + long[] sequences, + CancellationToken cancellation) + { + await store.Storage.Database.MarkEventsAsSkipped(sequences, cancellation); + } + + // If you're using multi-tenancy through separate databases, + // you'll need to use the correct database + public static async Task mark_events_as_skipped_by_tenant( + IDocumentStore store, + long[] sequences, + string tenantId, + CancellationToken cancellation) + { + var database = await store.Storage.FindOrCreateDatabase(tenantId); + await database.MarkEventsAsSkipped(sequences, cancellation); + } + + +} diff --git a/src/EventSourcingTests/advanced_async_tracking.cs b/src/EventSourcingTests/advanced_async_tracking.cs new file mode 100644 index 0000000000..bd573cc00a --- /dev/null +++ b/src/EventSourcingTests/advanced_async_tracking.cs @@ -0,0 +1,104 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventSourcingTests.FetchForWriting; +using JasperFx.Events; +using Marten.Events.Daemon.HighWater; +using Marten.Events.Projections; +using Marten.Storage; +using Marten.Testing.Harness; +using Microsoft.Extensions.Logging.Abstractions; +using Shouldly; +using Weasel.Core; +using Weasel.Postgresql.Tables; +using Xunit; + +namespace EventSourcingTests; + +public class advanced_async_tracking : OneOffConfigurationsContext, IAsyncLifetime +{ + private HighWaterDetector theDetector; + + public advanced_async_tracking() + { + StoreOptions(opts => + { + opts.Events.EnableAdvancedAsyncTracking = true; + opts.Projections.Snapshot(SnapshotLifecycle.Inline); + }); + } + + [Fact] + public async Task should_have_the_skips_table() + { + await theStore.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent)); + + var existingTables = await theStore.Storage.Database.SchemaTables(); + var skipTable = existingTables.Single(x => x.Name == "mt_high_water_skips"); + skipTable.ShouldNotBeNull(); + } + + [Fact] + public async Task should_have_the_mt_mark_progression_with_skip_function() + { + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + + var functions = await theStore.Storage.Database.Functions(); + functions.ShouldContain(new DbObjectName(SchemaName, "mt_mark_progression_with_skip")); + } + + [Fact] + public async Task can_apply_and_assert_correctly() + { + await theStore.Storage.ApplyAllConfiguredChangesToDatabaseAsync(); + await theStore.Storage.Database.AssertDatabaseMatchesConfigurationAsync(); + } + + public async Task InitializeAsync() + { + await theStore.Storage.Database.EnsureStorageExistsAsync(typeof(IEvent)); + theDetector = new HighWaterDetector((MartenDatabase)theStore.Storage.Database, theStore.Options.EventGraph, NullLogger.Instance); + } + + public Task DisposeAsync() + { + return Task.CompletedTask; + } + + [Fact] + public async Task try_mark_from_nothing_would_be_0() + { + var final = await theDetector.TryMarkHighWaterSkippingAsync(1000, 100, CancellationToken.None); + final.ShouldBe(0); + } + + [Fact] + public async Task happy_path_mark_the_skip() + { + await theDetector.MarkHighWaterMarkInDatabaseAsync(1000, CancellationToken.None); + + var latest = await theDetector.TryMarkHighWaterSkippingAsync(1100, 1000, CancellationToken.None); + + latest.ShouldBe(1100); + + var skips = await theDetector.FetchLastProgressionSkipsAsync(100, CancellationToken.None); + + skips[0].Ending.ShouldBe(1100); + skips[0].Starting.ShouldBe(1000); + + } + + [Fact] + public async Task sad_path_the_high_water_mark_has_moved() + { + await theDetector.MarkHighWaterMarkInDatabaseAsync(1100, CancellationToken.None); + + var latest = await theDetector.TryMarkHighWaterSkippingAsync(1050, 1000, CancellationToken.None); + + latest.ShouldBe(1100); + + var skips = await theDetector.FetchLastProgressionSkipsAsync(100, CancellationToken.None); + skips.Any().ShouldBeFalse(); + } +} diff --git a/src/EventSourcingTests/marking_events_as_skipped_as_Guid_identified.cs b/src/EventSourcingTests/marking_events_as_skipped_as_Guid_identified.cs new file mode 100644 index 0000000000..d12463f156 --- /dev/null +++ b/src/EventSourcingTests/marking_events_as_skipped_as_Guid_identified.cs @@ -0,0 +1,113 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using EventSourcingTests.Aggregation; +using EventSourcingTests.FetchForWriting; +using JasperFx.Events; +using Marten; +using Marten.Events; +using Marten.Events.Schema; +using Marten.Testing; +using Marten.Testing.Harness; +using Npgsql; +using Shouldly; +using Xunit; +using Xunit.Abstractions; + +namespace EventSourcingTests; + +public class marking_events_as_skipped_as_Guid_identified : OneOffConfigurationsContext, IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private Guid theStreamId; + + public marking_events_as_skipped_as_Guid_identified(ITestOutputHelper output) + { + _output = output; + StoreOptions(opts => + { + opts.Events.EnableEventSkippingInProjectionsOrSubscriptions = true; + }); + } + + public async Task InitializeAsync() + { + theStreamId = theSession.Events.StartStream(new AEvent(), new BEvent(), new BEvent(), new CEvent(), new CEvent(), new CEvent()).Id; + await theSession.SaveChangesAsync(); + + var aggregate1 = await theSession.Events.AggregateStreamAsync(theStreamId); + + // CEvent have not been skipped + aggregate1.CCount.ShouldBe(3); + + var events = await theSession.Events.QueryAllRawEvents().Where(x => x.EventTypesAre(typeof(CEvent))) + .ToListAsync(); + + events.Any().ShouldBeTrue(); + + var sequences = events.Select(x => x.Sequence).ToArray(); + + await theStore.Storage.Database.MarkEventsAsSkipped(sequences); + } + + public Task DisposeAsync() + { + return Task.CompletedTask; + } + + [Fact] + public async Task has_the_is_skipped_column() + { + var table = new EventsTable(theStore.Events); + var column = table.ColumnFor("is_skipped"); + column.ShouldNotBeNull(); + + await theStore.EnsureStorageExistsAsync(typeof(IEvent)); + + using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + + var existing = await table.FetchExistingAsync(conn); + await conn.CloseAsync(); + + existing.ColumnFor("is_skipped").ShouldNotBeNull(); + } + + /* + * TODO -- test it with... + * Async + FetchLatest + * Async + FetchForWriting + * + * Do everything as Guid v. string identified + */ + + [Fact] + public async Task use_skip_for_realsies_and_live_aggregation() + { + var aggregate2 = await theSession.Events.AggregateStreamAsync(theStreamId); + + // All the CCount were skipped + aggregate2.CCount.ShouldBe(0); + } + + [Fact] + public async Task applies_to_fetch_latest_as_live() + { + var aggregate2 = await theSession.Events.FetchLatest(theStreamId); + + // All the CCount were skipped + aggregate2.CCount.ShouldBe(0); + } + + [Fact] + public async Task applies_to_fetch_fetch_for_writing_as_live() + { + var aggregate2 = await theSession.Events.FetchForWriting(theStreamId); + + // All the CCount were skipped + aggregate2.Aggregate.CCount.ShouldBe(0); + } + + + +} diff --git a/src/EventSourcingTests/marking_events_as_skipped_as_string_identified.cs b/src/EventSourcingTests/marking_events_as_skipped_as_string_identified.cs new file mode 100644 index 0000000000..573e831f9d --- /dev/null +++ b/src/EventSourcingTests/marking_events_as_skipped_as_string_identified.cs @@ -0,0 +1,113 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using EventSourcingTests.Aggregation; +using EventSourcingTests.FetchForWriting; +using JasperFx.Events; +using Marten.Events; +using Marten.Events.Schema; +using Marten.Testing; +using Marten.Testing.Harness; +using Npgsql; +using Shouldly; +using Xunit; +using Xunit.Abstractions; +using QueryableExtensions = Marten.QueryableExtensions; + +namespace EventSourcingTests; + +public class marking_events_as_skipped_as_string_identified : OneOffConfigurationsContext, IAsyncLifetime +{ + private readonly ITestOutputHelper _output; + private string theStreamId = Guid.NewGuid().ToString(); + + public marking_events_as_skipped_as_string_identified(ITestOutputHelper output) + { + _output = output; + StoreOptions(opts => + { + opts.Events.EnableEventSkippingInProjectionsOrSubscriptions = true; + opts.Events.StreamIdentity = StreamIdentity.AsString; + }); + } + + public async Task InitializeAsync() + { + theSession.Events.StartStream(theStreamId, new AEvent(), new BEvent(), new BEvent(), new CEvent(), new CEvent(), new CEvent()); + await theSession.SaveChangesAsync(); + + var aggregate1 = await theSession.Events.AggregateStreamAsync(theStreamId); + + // CEvent have not been skipped + aggregate1.CCount.ShouldBe(3); + + var events = await QueryableExtensions.ToListAsync(theSession.Events.QueryAllRawEvents().Where(x => LinqExtensions.EventTypesAre(x, typeof(CEvent)))); + + events.Any().ShouldBeTrue(); + + var sequences = events.Select(x => x.Sequence).ToArray(); + + await theStore.Storage.Database.MarkEventsAsSkipped(sequences); + } + + public Task DisposeAsync() + { + return Task.CompletedTask; + } + + [Fact] + public async Task has_the_is_skipped_column() + { + var table = new EventsTable(theStore.Events); + var column = table.ColumnFor("is_skipped"); + column.ShouldNotBeNull(); + + await theStore.EnsureStorageExistsAsync(typeof(IEvent)); + + using var conn = new NpgsqlConnection(ConnectionSource.ConnectionString); + await conn.OpenAsync(); + + var existing = await table.FetchExistingAsync(conn); + await conn.CloseAsync(); + + existing.ColumnFor("is_skipped").ShouldNotBeNull(); + } + + /* + * TODO -- test it with... + * Async + FetchLatest + * Async + FetchForWriting + * + * Do everything as Guid v. string identified + */ + + [Fact] + public async Task use_skip_for_realsies_and_live_aggregation() + { + var aggregate2 = await theSession.Events.AggregateStreamAsync(theStreamId); + + // All the CCount were skipped + aggregate2.CCount.ShouldBe(0); + } + + [Fact] + public async Task applies_to_fetch_latest_as_live() + { + var aggregate2 = await theSession.Events.FetchLatest(theStreamId); + + // All the CCount were skipped + aggregate2.CCount.ShouldBe(0); + } + + [Fact] + public async Task applies_to_fetch_fetch_for_writing_as_live() + { + var aggregate2 = await theSession.Events.FetchForWriting(theStreamId); + + // All the CCount were skipped + aggregate2.Aggregate.CCount.ShouldBe(0); + } + + + +} diff --git a/src/Marten/DocumentStore.EventStore.cs b/src/Marten/DocumentStore.EventStore.cs index e8acf45749..d26718dfea 100644 --- a/src/Marten/DocumentStore.EventStore.cs +++ b/src/Marten/DocumentStore.EventStore.cs @@ -251,6 +251,11 @@ private IEnumerable buildEventLoaderFilters(EventFilterable filter { yield return IsNotArchivedFilter.Instance; } + + if (Options.EventGraph.EnableEventSkippingInProjectionsOrSubscriptions) + { + yield return IsNotSkippedFilter.Instance; + } } IDocumentOperations IEventStore.OpenSession(IEventDatabase database) diff --git a/src/Marten/Events/Archiving/IsNotSkippedFilter.cs b/src/Marten/Events/Archiving/IsNotSkippedFilter.cs new file mode 100644 index 0000000000..16bfda0cff --- /dev/null +++ b/src/Marten/Events/Archiving/IsNotSkippedFilter.cs @@ -0,0 +1,25 @@ +using Weasel.Postgresql; +using Weasel.Postgresql.SqlGeneration; + +namespace Marten.Events.Archiving; + +internal class IsNotSkippedFilter: IReversibleWhereFragment +{ + private static readonly string _sql = $"d.is_skipped = FALSE"; + + public static readonly IsNotSkippedFilter Instance = new(); + + private IsNotSkippedFilter() + { + } + + public void Apply(ICommandBuilder builder) + { + builder.Append(_sql); + } + + public ISqlFragment Reverse() + { + return IsSkippedFilter.Instance; + } +} \ No newline at end of file diff --git a/src/Marten/Events/Archiving/IsSkippedFilter.cs b/src/Marten/Events/Archiving/IsSkippedFilter.cs new file mode 100644 index 0000000000..3f82485ee6 --- /dev/null +++ b/src/Marten/Events/Archiving/IsSkippedFilter.cs @@ -0,0 +1,25 @@ +using Weasel.Postgresql; +using Weasel.Postgresql.SqlGeneration; + +namespace Marten.Events.Archiving; + +internal class IsSkippedFilter: IReversibleWhereFragment +{ + private static readonly string _sql = $"d.is_skipped = TRUE"; + + public static readonly IsSkippedFilter Instance = new(); + + private IsSkippedFilter() + { + } + + public void Apply(ICommandBuilder builder) + { + builder.Append(_sql); + } + + public ISqlFragment Reverse() + { + return IsNotSkippedFilter.Instance; + } +} \ No newline at end of file diff --git a/src/Marten/Events/Daemon/HighWater/HighWaterDetector.cs b/src/Marten/Events/Daemon/HighWater/HighWaterDetector.cs index fe83497ad0..44a7f7d97c 100644 --- a/src/Marten/Events/Daemon/HighWater/HighWaterDetector.cs +++ b/src/Marten/Events/Daemon/HighWater/HighWaterDetector.cs @@ -1,4 +1,6 @@ using System; +using System.Collections.Generic; +using System.Data.Common; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -6,6 +8,7 @@ using JasperFx.Events.Daemon.HighWater; using JasperFx.Events.Projections; using Marten.Events.Projections; +using Marten.Events.Schema; using Marten.Services; using Marten.Storage; using Microsoft.Extensions.Logging; @@ -15,6 +18,12 @@ namespace Marten.Events.Daemon.HighWater; +internal enum DetectionType +{ + Normal, + SafeZoneSkipping +} + internal class HighWaterDetector: IHighWaterDetector { private readonly GapDetector _gapDetector; @@ -46,7 +55,7 @@ public HighWaterDetector(MartenDatabase runner, EventGraph graph, ILogger logger public async Task AdvanceHighWaterMarkToLatest(CancellationToken token) { var statistics = await loadCurrentStatistics(token).ConfigureAwait(false); - await markHighWaterMarkInDatabaseAsync(token, statistics.HighestSequence).ConfigureAwait(false); + await MarkHighWaterMarkInDatabaseAsync(statistics.HighestSequence, token).ConfigureAwait(false); } public string DatabaseIdentity { get; } @@ -94,7 +103,7 @@ public async Task DetectInSafeZone(CancellationToken token) } } - await calculateHighWaterMark(statistics, token).ConfigureAwait(false); + await calculateHighWaterMark(statistics, DetectionType.SafeZoneSkipping, token).ConfigureAwait(false); return statistics; } @@ -104,12 +113,13 @@ public async Task Detect(CancellationToken token) { var statistics = await loadCurrentStatistics(token).ConfigureAwait(false); - await calculateHighWaterMark(statistics, token).ConfigureAwait(false); + await calculateHighWaterMark(statistics, DetectionType.Normal, token).ConfigureAwait(false); return statistics; } - private async Task calculateHighWaterMark(HighWaterStatistics statistics, CancellationToken token) + private async Task calculateHighWaterMark(HighWaterStatistics statistics, DetectionType detectionType, + CancellationToken token) { // If the last high water mark is the same as the highest number // assigned from the sequence, then the high water mark cannot @@ -130,7 +140,32 @@ private async Task calculateHighWaterMark(HighWaterStatistics statistics, Cancel if (statistics.HasChanged) { var currentMark = statistics.CurrentMark; - await markHighWaterMarkInDatabaseAsync(token, currentMark).ConfigureAwait(false); + + if (detectionType == DetectionType.SafeZoneSkipping) + { + if (_graph.EnableAdvancedAsyncTracking) + { + var actual = await TryMarkHighWaterSkippingAsync(currentMark, statistics.LastMark, token).ConfigureAwait(false); + if (actual == currentMark) + { + statistics.IncludesSkipping = true; + } + else + { + statistics.CurrentMark = actual; + statistics.IncludesSkipping = false; + } + } + else + { + statistics.IncludesSkipping = true; + await MarkHighWaterMarkInDatabaseAsync(currentMark, token).ConfigureAwait(false); + } + } + else + { + await MarkHighWaterMarkInDatabaseAsync(currentMark, token).ConfigureAwait(false); + } if (!statistics.LastUpdated.HasValue) { @@ -140,14 +175,72 @@ private async Task calculateHighWaterMark(HighWaterStatistics statistics, Cancel } } - private async Task markHighWaterMarkInDatabaseAsync(CancellationToken token, long currentMark) + public Task MarkHighWaterMarkInDatabaseAsync(long currentMark, CancellationToken token) + { + return _runner.Query(new MarkHighWaterQueryHandler(_graph, currentMark), token); + } + + public class MarkHighWaterQueryHandler: ISingleQueryHandler { - await using var cmd = - new NpgsqlCommand( - $"select {_graph.DatabaseSchemaName}.mt_mark_event_progression('{ShardState.HighWaterMark}', :seq);") - .With("seq", currentMark); + private readonly EventGraph _graph; + private readonly long _currentMark; + + public MarkHighWaterQueryHandler(EventGraph graph, long currentMark) + { + _graph = graph; + _currentMark = currentMark; + } + + public NpgsqlCommand BuildCommand() + { + return new NpgsqlCommand( + $"select {_graph.DatabaseSchemaName}.mt_mark_event_progression('{ShardState.HighWaterMark}', :seq);") + .With("seq", _currentMark); + } - await _runner.SingleCommit(cmd, token).ConfigureAwait(false); + public Task HandleAsync(DbDataReader reader, CancellationToken token) + { + return Task.FromResult(true); + } + } + + public Task TryMarkHighWaterSkippingAsync(long endingMark, long currentMark, CancellationToken token) + { + if (endingMark <= currentMark) + throw new ArgumentOutOfRangeException(nameof(endingMark), + "Ending sequence should be greater than the current mark"); + + return _runner.Query(new TryMarkHighWaterSkippingHandler(_graph, endingMark, currentMark), token); + } + + public class TryMarkHighWaterSkippingHandler: ISingleQueryHandler + { + private readonly NpgsqlCommand _command; + + public TryMarkHighWaterSkippingHandler(EventGraph graph, long endingMark, long currentMark) + { + _command = new NpgsqlCommand( + $"select {graph.DatabaseSchemaName}.mt_mark_progression_with_skip('{ShardState.HighWaterMark}', :ending, :starting);") + .With("ending", endingMark) + .With("starting", currentMark) + ; + + } + + public NpgsqlCommand BuildCommand() + { + return _command; + } + + public async Task HandleAsync(DbDataReader reader, CancellationToken token) + { + if (await reader.ReadAsync(token).ConfigureAwait(false)) + { + return await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + } + + return 0; + } } public async Task TryCorrectProgressInDatabaseAsync(CancellationToken token) @@ -164,6 +257,11 @@ public async Task TryCorrectProgressInDatabaseAsync(CancellationToken token) } } + public Task> FetchLastProgressionSkipsAsync(int limit, CancellationToken cancellationToken) + { + return _runner.Query(new EventProgressionSkipsHandler(_graph, limit), cancellationToken); + } + private async Task loadCurrentStatistics(CancellationToken token) { return await _runner.Query(_highWaterStatisticsDetector, token).ConfigureAwait(false); @@ -198,7 +296,7 @@ private async Task findCurrentMark(HighWaterStatistics statistics, Cancell // This happens when the agent is restarted with persisted // state, and has no previous current mark. - if (statistics.CurrentMark == 0 && statistics.LastMark > 0) + if (statistics is { CurrentMark: 0, LastMark: > 0 }) { return statistics.LastMark; } diff --git a/src/Marten/Events/EventGraph.FeatureSchema.cs b/src/Marten/Events/EventGraph.FeatureSchema.cs index 2884162063..5ed0326997 100644 --- a/src/Marten/Events/EventGraph.FeatureSchema.cs +++ b/src/Marten/Events/EventGraph.FeatureSchema.cs @@ -64,5 +64,12 @@ private IEnumerable createAllSchemaObjects() var objects = schemaSource.CreateSchemaObjects(this); foreach (var schemaObject in objects) yield return schemaObject; } + + if (EnableAdvancedAsyncTracking) + { + yield return new EventProgressionSkippingTable(this); + yield return new SystemFunction(DatabaseSchemaName, "mt_mark_progression_with_skip", + "varchar, bigint, bigint"); + } } } diff --git a/src/Marten/Events/EventGraph.cs b/src/Marten/Events/EventGraph.cs index 142cccc0c4..8aba4f1c62 100644 --- a/src/Marten/Events/EventGraph.cs +++ b/src/Marten/Events/EventGraph.cs @@ -195,6 +195,14 @@ public IEvent BuildEvent(object eventData) /// public bool UseMandatoryStreamTypeDeclaration { get; set; } + public bool EnableAdvancedAsyncTracking { get; set; } + + /// + /// This is an "opt in" feature to add the capability to mark some events as "skipped" in the database + /// meaning that they do not apply to projections or subscriptions. Use this to "cure" bad events + /// + public bool EnableEventSkippingInProjectionsOrSubscriptions { get; set; } + /// /// Opt into using PostgreSQL list partitioning. This can have significant performance and scalability benefits /// *if* you are also aggressively using event stream archiving diff --git a/src/Marten/Events/EventStatement.cs b/src/Marten/Events/EventStatement.cs index 68d41a37a9..184ab678c2 100644 --- a/src/Marten/Events/EventStatement.cs +++ b/src/Marten/Events/EventStatement.cs @@ -4,12 +4,8 @@ using JasperFx.Core; using JasperFx.Events.Projections; using Marten.Events.Archiving; -using Marten.Events.Daemon; -using Marten.Events.Daemon.Internals; -using Marten.Linq; using Marten.Linq.SqlGeneration; using Marten.Storage; -using Weasel.Core; using Weasel.Postgresql; using Weasel.Postgresql.SqlGeneration; @@ -20,9 +16,10 @@ internal class EventStatement: SelectorStatement private const string ALL_TENANTS = "~ALL~"; private readonly IEventStorage _storage; - public EventStatement(IEventStorage storage) + public EventStatement(IEventStorage storage, EventGraph events) { _storage = storage; + IgnoreSkipped = events.EnableEventSkippingInProjectionsOrSubscriptions; } public ISqlFragment[] Filters { get; set; } = Array.Empty(); @@ -40,6 +37,7 @@ public EventStatement(IEventStorage storage) public Guid StreamId { get; set; } = Guid.Empty; public long FromVersion { get; set; } + public bool IgnoreSkipped { get; } protected override void configure(ICommandBuilder builder) { @@ -73,6 +71,11 @@ private IEnumerable filters() { yield return IsNotArchivedFilter.Instance; + if (IgnoreSkipped) + { + yield return IsNotSkippedFilter.Instance; + } + if (Range != null) { yield return new WhereFragment("d.seq_id > ?", Range.SequenceFloor); @@ -112,6 +115,3 @@ private IEnumerable filters() foreach (var filter in Filters) yield return filter; } } - - - diff --git a/src/Marten/Events/EventStore.FetchForWriting.cs b/src/Marten/Events/EventStore.FetchForWriting.cs index f5c3b4b898..42df11c0cb 100644 --- a/src/Marten/Events/EventStore.FetchForWriting.cs +++ b/src/Marten/Events/EventStore.FetchForWriting.cs @@ -52,7 +52,7 @@ IEventStream IEventIdentityStrategy.AppendToStream(TDoc? docum IQueryHandler> IEventIdentityStrategy.BuildEventQueryHandler(bool isGlobal, Guid id, IEventStorage selector, ISqlFragment? filter = null) { - var statement = new EventStatement(selector) { StreamId = id, TenantId = isGlobal ? StorageConstants.DefaultTenantId : _tenant.TenantId }; + var statement = new EventStatement(selector, _store.Options.EventGraph) { StreamId = id, TenantId = isGlobal ? StorageConstants.DefaultTenantId : _tenant.TenantId}; if (filter != null) { statement.Filters = [filter]; @@ -65,7 +65,7 @@ IQueryHandler> IEventIdentityStrategy.BuildEventQuer ISqlFragment? filter) { var selector = _store.Events.EnsureAsGuidStorage(_session); - var statement = new EventStatement(selector) { StreamId = id, TenantId = isGlobal ? StorageConstants.DefaultTenantId : _tenant.TenantId }; + var statement = new EventStatement(selector, _store.Options.EventGraph) { StreamId = id, TenantId = isGlobal ? StorageConstants.DefaultTenantId : _tenant.TenantId }; if (filter != null) { statement.Filters = [filter]; @@ -78,7 +78,7 @@ IQueryHandler> IEventIdentityStrategy.BuildEventQu ISqlFragment? filter) { var selector = _store.Events.EnsureAsStringStorage(_session); - var statement = new EventStatement(selector) { StreamKey = id, TenantId = isGlobal ? StorageConstants.DefaultTenantId : _tenant.TenantId }; + var statement = new EventStatement(selector, _store.Options.EventGraph) { StreamKey = id, TenantId = isGlobal ? StorageConstants.DefaultTenantId : _tenant.TenantId }; if (filter != null) { statement.Filters = [filter]; @@ -117,7 +117,7 @@ IEventStream IEventIdentityStrategy.AppendToStream(TDoc? doc IQueryHandler> IEventIdentityStrategy.BuildEventQueryHandler(bool isGlobal, string id, IEventStorage selector, ISqlFragment? filter = null) { - var statement = new EventStatement(selector) { StreamKey = id, TenantId = isGlobal ? StorageConstants.DefaultTenantId : _tenant.TenantId }; + var statement = new EventStatement(selector, _store.Options.EventGraph) { StreamKey = id, TenantId = isGlobal ? StorageConstants.DefaultTenantId : _tenant.TenantId }; if (filter != null) { statement.Filters = [filter]; diff --git a/src/Marten/Events/IEventStoreOptions.cs b/src/Marten/Events/IEventStoreOptions.cs index f3c6f6d29b..daa85d9990 100644 --- a/src/Marten/Events/IEventStoreOptions.cs +++ b/src/Marten/Events/IEventStoreOptions.cs @@ -65,6 +65,12 @@ public interface IEventStoreOptions public EventAppendMode AppendMode { get; set; } + /// + /// Opt into more robust tracking of asynchronous projection behavior. Default is false. This will add + /// extra tables, functions, and columns to your Marten event store schema + /// + public bool EnableAdvancedAsyncTracking { get; set; } + /// /// Opt into using PostgreSQL list partitioning. This can have significant performance and scalability benefits /// *if* you are also aggressively using event stream archiving @@ -96,6 +102,12 @@ public interface IEventStoreOptions /// public EventNamingStyle EventNamingStyle { get; set; } + /// + /// This is an "opt in" feature to add the capability to mark some events as "skipped" in the database + /// meaning that they do not apply to projections or subscriptions. Use this to "cure" bad events + /// + public bool EnableEventSkippingInProjectionsOrSubscriptions { get; set; } + /// /// Register an event type with Marten. This isn't strictly necessary for normal usage, /// but can help Marten with asynchronous projections where Marten hasn't yet encountered diff --git a/src/Marten/Events/IReadOnlyEventStoreOptions.cs b/src/Marten/Events/IReadOnlyEventStoreOptions.cs index 26ec148ea7..6ec5757ed8 100644 --- a/src/Marten/Events/IReadOnlyEventStoreOptions.cs +++ b/src/Marten/Events/IReadOnlyEventStoreOptions.cs @@ -91,4 +91,16 @@ public interface IReadOnlyEventStoreOptions /// Opt into different aliasing styles for .NET event types /// EventNamingStyle EventNamingStyle { get; set; } + + /// + /// Opt into more robust tracking of asynchronous projection behavior. Default is false. This will add + /// extra tables, functions, and columns to your Marten event store schema + /// + public bool EnableAdvancedAsyncTracking { get; set; } + + /// + /// This is an "opt in" feature to add the capability to mark some events as "skipped" in the database + /// meaning that they do not apply to projections or subscriptions. Use this to "cure" bad events + /// + bool EnableEventSkippingInProjectionsOrSubscriptions { get; set; } } diff --git a/src/Marten/Events/QueryEventStore.cs b/src/Marten/Events/QueryEventStore.cs index 576a9c7a65..bcf2cf1ace 100644 --- a/src/Marten/Events/QueryEventStore.cs +++ b/src/Marten/Events/QueryEventStore.cs @@ -35,7 +35,7 @@ public async Task> FetchStreamAsync(Guid streamId, long ve await _tenant.Database.EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false); - var statement = new EventStatement(selector) + var statement = new EventStatement(selector, _store.Events) { StreamId = streamId, Version = version, @@ -56,7 +56,7 @@ public async Task> FetchStreamAsync(string streamKey, long await _tenant.Database.EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false); - var statement = new EventStatement(selector) + var statement = new EventStatement(selector, _store.Events) { StreamKey = streamKey, Version = version, diff --git a/src/Marten/Events/Schema/EventProgressionSkippingTable.cs b/src/Marten/Events/Schema/EventProgressionSkippingTable.cs new file mode 100644 index 0000000000..a37aa33c4c --- /dev/null +++ b/src/Marten/Events/Schema/EventProgressionSkippingTable.cs @@ -0,0 +1,75 @@ +using System; +using System.Collections.Generic; +using System.Data.Common; +using System.IO; +using System.Threading; +using System.Threading.Tasks; +using Marten.Internal; +using Marten.Linq.QueryHandlers; +using Marten.Services; +using Npgsql; +using Weasel.Core; +using Weasel.Postgresql; +using Weasel.Postgresql.Tables; + +namespace Marten.Events.Schema; + +internal class EventProgressionSkippingTable: Table +{ + public const string Name = "mt_high_water_skips"; + + public EventProgressionSkippingTable(EventGraph eventGraph) : base(new PostgresqlObjectName(eventGraph.DatabaseSchemaName, Name)) + { + AddColumn("ending_sequence").AsPrimaryKey(); + AddColumn("starting_sequence").NotNull(); + AddColumn("timestamp", "timestamp with time zone") + .DefaultValueByExpression("(transaction_timestamp())"); + } +} + +public record HighWaterDetectionSkip(long Ending, long Starting, DateTimeOffset Timestamp); + +internal class EventProgressionSkipsHandler : ISingleQueryHandler> +{ + private readonly EventGraph _graph; + private readonly int _limit; + + public EventProgressionSkipsHandler(EventGraph graph, int limit) + { + _graph = graph; + _limit = limit; + } + + public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) + { + builder.Append(""); + builder.AppendParameter(_limit); + } + + public IReadOnlyList Handle(DbDataReader reader, IMartenSession session) + { + throw new NotSupportedException(); + } + + public NpgsqlCommand BuildCommand() + { + return new NpgsqlCommand( + $"select ending_sequence, starting_sequence, timestamp from {_graph.DatabaseSchemaName}.{EventProgressionSkippingTable.Name} order by ending_sequence desc limit :limit") + .With("limit", _limit); + } + + public async Task> HandleAsync(DbDataReader reader, CancellationToken token) + { + var list = new List(); + while (await reader.ReadAsync(token).ConfigureAwait(false)) + { + var ending = await reader.GetFieldValueAsync(0, token).ConfigureAwait(false); + var starting = await reader.GetFieldValueAsync(1, token).ConfigureAwait(false); + var timestamp = await reader.GetFieldValueAsync(2, token).ConfigureAwait(false); + + list.Add(new HighWaterDetectionSkip(ending, starting, timestamp)); + } + + return list; + } +} diff --git a/src/Marten/Events/Schema/EventProgressionTable.cs b/src/Marten/Events/Schema/EventProgressionTable.cs index e8bccce2d5..eff38b19fa 100644 --- a/src/Marten/Events/Schema/EventProgressionTable.cs +++ b/src/Marten/Events/Schema/EventProgressionTable.cs @@ -1,5 +1,7 @@ +using System; using JasperFx.Events.Projections; using Marten.Events.Daemon; +using Weasel.Core; using Weasel.Postgresql; using Weasel.Postgresql.Tables; diff --git a/src/Marten/Events/Schema/EventsTable.cs b/src/Marten/Events/Schema/EventsTable.cs index 94a9f07490..4a7416032e 100644 --- a/src/Marten/Events/Schema/EventsTable.cs +++ b/src/Marten/Events/Schema/EventsTable.cs @@ -3,6 +3,7 @@ using Marten.Events.Archiving; using Marten.Storage; using Marten.Storage.Metadata; +using Microsoft.CodeAnalysis.VisualBasic.Syntax; using Weasel.Postgresql; using Weasel.Postgresql.Tables; @@ -33,6 +34,11 @@ public EventsTable(EventGraph events): base(new PostgresqlObjectName(events.Data AddIfActive(events.Metadata.Headers); AddIfActive(events.Metadata.UserName); + if (events.EnableEventSkippingInProjectionsOrSubscriptions) + { + AddColumn("is_skipped").DefaultValueByExpression("FALSE"); + } + if (events.TenancyStyle == TenancyStyle.Conjoined) { if (events.UseArchivedStreamPartitioning) diff --git a/src/Marten/Marten.csproj b/src/Marten/Marten.csproj index 43f99795eb..8324d5872d 100644 --- a/src/Marten/Marten.csproj +++ b/src/Marten/Marten.csproj @@ -34,7 +34,7 @@ - + diff --git a/src/Marten/Schema/SQL/mt_mark_progression_with_skip.sql b/src/Marten/Schema/SQL/mt_mark_progression_with_skip.sql new file mode 100644 index 0000000000..1f1b0674eb --- /dev/null +++ b/src/Marten/Schema/SQL/mt_mark_progression_with_skip.sql @@ -0,0 +1,20 @@ +CREATE +OR REPLACE FUNCTION {databaseSchema}.mt_mark_progression_with_skip(shard_name varchar, ending_sequence bigint, starting_sequence bigint) RETURNS bigint AS +$$ +DECLARE + current_value bigint; +BEGIN + select last_seq_id into current_value from {databaseSchema}.mt_event_progression where name = shard_name; + + IF current_value is null then + return 0; + ELSIF current_value = starting_sequence THEN + update {databaseSchema}.mt_event_progression SET last_seq_id = ending_sequence, last_updated = transaction_timestamp() where shard_name = name; + insert into {databaseSchema}.mt_high_water_skips (ending_sequence, starting_sequence) values (ending_sequence, starting_sequence); + return ending_sequence; + ELSE + return current_value; + END IF; +END +$$ +LANGUAGE plpgsql; diff --git a/src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs b/src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs index 47030d0c71..29e041a927 100644 --- a/src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs +++ b/src/Marten/Services/BatchQuerying/BatchedQuery.Events.cs @@ -44,7 +44,7 @@ public Task> FetchStream(Guid streamId, long version = 0, { _documentTypes.Add(typeof(IEvent)); var selector = Parent.EventStorage(); - var statement = new EventStatement(selector) + var statement = new EventStatement(selector, Parent.Options.EventGraph) { StreamId = streamId, Version = version, @@ -63,7 +63,7 @@ public Task> FetchStream(string streamKey, long version = { _documentTypes.Add(typeof(IEvent)); var selector = Parent.EventStorage(); - var statement = new EventStatement(selector) + var statement = new EventStatement(selector, Parent.Options.EventGraph) { StreamKey = streamKey, Version = version, diff --git a/src/Marten/Storage/IMartenDatabase.cs b/src/Marten/Storage/IMartenDatabase.cs index 85702eaaee..1364449419 100644 --- a/src/Marten/Storage/IMartenDatabase.cs +++ b/src/Marten/Storage/IMartenDatabase.cs @@ -140,6 +140,8 @@ Task ProjectionProgressFor(ShardName name, /// /// Task FetchHighestEventSequenceNumber(CancellationToken token = default); + + Task MarkEventsAsSkipped(long[] sequences, CancellationToken token = default); } public enum ConnectionUsage diff --git a/src/Marten/Storage/MartenDatabase.EventStorage.cs b/src/Marten/Storage/MartenDatabase.EventStorage.cs index a10ad3d0ff..8c8a0a90db 100644 --- a/src/Marten/Storage/MartenDatabase.EventStorage.cs +++ b/src/Marten/Storage/MartenDatabase.EventStorage.cs @@ -26,6 +26,25 @@ public partial class MartenDatabase : IEventDatabase { private string _storageIdentifier; + public async Task MarkEventsAsSkipped(long[] sequences, CancellationToken token = default) + { + await EnsureStorageExistsAsync(typeof(IEvent), token).ConfigureAwait(false); + + await using var conn = CreateConnection(); + try + { + await conn.OpenAsync(token).ConfigureAwait(false); + await conn.CreateCommand( + $"update {Options.EventGraph.DatabaseSchemaName}.mt_events set is_skipped = TRUE where seq_id = ANY(:sequences)") + .With("sequences", sequences) + .ExecuteNonQueryAsync(token).ConfigureAwait(false); + } + finally + { + await conn.CloseAsync().ConfigureAwait(false); + } + } + public async Task FindEventStoreFloorAtTimeAsync(DateTimeOffset timestamp, CancellationToken token) { var sql = diff --git a/src/Marten/Storage/Metadata/DocumentMetadata.cs b/src/Marten/Storage/Metadata/DocumentMetadata.cs index f6e270c1be..7d530fadeb 100644 --- a/src/Marten/Storage/Metadata/DocumentMetadata.cs +++ b/src/Marten/Storage/Metadata/DocumentMetadata.cs @@ -83,4 +83,9 @@ public DocumentMetadata(object id) /// Optional, user defined headers /// public Dictionary Headers { get; set; } = new(); + + /// + /// Only applies to events + /// + public bool IsSkipped { get; set; } } diff --git a/src/Marten/Storage/StandinDatabase.cs b/src/Marten/Storage/StandinDatabase.cs index 050795a1b7..866de48111 100644 --- a/src/Marten/Storage/StandinDatabase.cs +++ b/src/Marten/Storage/StandinDatabase.cs @@ -206,6 +206,16 @@ public async Task FetchHighestEventSequenceNumber(CancellationToken token throw new NotImplementedException(); } + public Task MarkEventsAsSkipped(long[] sequences, CancellationToken token = default) + { + throw new NotImplementedException(); + } + + public Task MarkEventAsSkipped(CancellationToken token, params long[] sequences) + { + return Task.CompletedTask; + } + public NpgsqlConnection CreateConnection(ConnectionUsage connectionUsage = ConnectionUsage.ReadWrite) { throw new NotImplementedException();