Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ const config: UserConfig<DefaultTheme.Config> = {
text: 'Removing Protected Information',
link: '/events/protection'
},
{text: 'Marking Events as Skipped', link: '/events/skipping'},
{
text: 'Aggregates, events and repositories',
link: '/scenarios/aggregates-events-repositories'
Expand Down
19 changes: 18 additions & 1 deletion docs/events/projections/async-daemon.md
Original file line number Diff line number Diff line change
Expand Up @@ -442,12 +442,14 @@ or subscription with the prefix: `marten.{Subscription or Projection Name}.{shar
* `loading` -- traces the loading of a page of events for a projection or subscription. Same tags as above
* `grouping` -- traces the grouping process for projections that happens prior to execution. This does not apply to subscriptions. Same tags as above

In addition, there are two metrics built for every combination of projection or subscription shard on each
In addition, there are three metrics built for every combination of projection or subscription shard on each
Marten database (in the case of using separate databases for multi-tenancy), again using the same prefix as above
with the addition of the Marten database identifier in the case of multi-tenancy through separate databases like `marten.{database name}.{projection or subscription name}.all.*:

* `processed` - a counter giving you an indication of how many events are being processed by the currently running subscription or projection shard
* `gap` - a histogram telling you the "gap" between the high water mark of the system and the furthest progression of the running subscription or projection.
* `skipped` - added in Marten 8.6, a counter telling you how many events were skipped during asynchronous projection or subscription processing. Depending on how the application is
configured, Marten may skip events because of serialization errors, unknown events, or application errors (basically, *your* code threw an exception)

::: tip
The `gap` metrics are a good health check on the performance of any given projection or subscription. If this gap
Expand Down Expand Up @@ -481,6 +483,21 @@ using multi-tenancy through a database per tenant. On these spans will be these
There is also a counter metric called `marten.daemon.skipping` or `marten.[database name].daemon.skipping`
that just emits and update every time that Marten has to "skip" stale events.

## Advanced Skipping Tracking <Badge type="tip" text="8.6" />

::: info
This setting will be required and utilized by the forthcoming "CritterWatch" tool.
:::

As part of some longer term planned improvements for Marten projection/subscription monitoring and potential
administrative "healing" functions, you can opt into having Marten write out an additional table
called `mt_high_water_skips` that tracks every time the high water detection has to "skip" over stale data. You can
use this information to "know" what streams and projections may be impacted by a skip.

The flag for this is shown below:

snippet: sample_enabling_advanced_tracking

## Querying for Non Stale Data

There are some potential benefits to running projections asynchronously, namely:
Expand Down
32 changes: 32 additions & 0 deletions docs/events/skipping.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Marking Events as Skipped <Badge type="tip" text="8.6" />

What if your code happens to append an event that turns out to be completely erroneous (not necessarily because of your code) and you wish you could
retroactively have it removed from the event store? You _could_ go and delete the event record directly from the `mt_events`
table, but maybe you have regulatory requirements that no events can ever be deleted -- which is the real world case that
spawned this feature. You _could_ also try a compensating event that effectively reverses the impact of the earlier, now
invalid event, but that requires more work and foresight on your part.

Instead, you can mark events in a Marten event store as "skipped" such that these events are left as is in the database,
but will no longer be applied:

1. In projections. You'd have to rebuild a projection that includes a skipped event to update the resulting projection though.
2. Subscriptions. If you rewind a subscription and replay it, the events marked as "skipped" are, well, skipped
3. `AggregateStreamAsync()` in all usages
4. `FetchLatest()` and `FetchForWriting()` usages, but again, you may have to rebuild a projection to take the skipped events out of the results

::: tip
Definitely check out [Rebuilding a Single Stream](/events/projections/rebuilding.html#rebuilding-a-single-stream) for part of the recipe for "healing" a system from bad events.
:::

To get started, you will first have to enable potential event skipping like this:

snippet: sample_enabling_event_skipping

That flag just enables the ability to mark events as _skipped_. As you'd imagine, that
flag alters Marten behavior by:

1. Adds a new field called `is_skipped` to your `mt_events` table
2. Adds an additional filter on `is_skipped = FALSE` on many event store operations detailed above

To mark events as skipped, you can either use raw SQL against your `mt_events` table, or
this helper API:
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DaemonTests.TestingSupport;
Expand All @@ -19,12 +20,20 @@ namespace DaemonTests.Internals;

public class HighWaterDetectorTests: DaemonContext
{
private readonly HighWaterDetector theDetector;
private HighWaterDetector _detector;

public HighWaterDetectorTests(ITestOutputHelper output) : base(output)
{
theStore.EnsureStorageExists(typeof(IEvent));
theDetector = new HighWaterDetector((MartenDatabase)theStore.Tenancy.Default.Database, theStore.Events, NullLogger.Instance);
}

internal HighWaterDetector theDetector
{
get
{
_detector ??= new HighWaterDetector((MartenDatabase)theStore.Tenancy.Default.Database, theStore.Events, NullLogger.Instance);
return _detector;
}
}

[Fact]
Expand Down Expand Up @@ -66,9 +75,14 @@ public async Task starting_from_first_detection_some_gaps_with_zero_buffer()
statistics.HighestSequence.ShouldBe(NumberOfEvents);
}

[Fact]
public async Task second_run_detect_same_gap_when_stale()
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task second_run_detect_same_gap_when_stale(bool useAdvancedTracking)
{
StoreOptions(opts => opts.Events.EnableAdvancedAsyncTracking = useAdvancedTracking);
await theStore.EnsureStorageExistsAsync(typeof(IEvent));

NumberOfStreams = 10;
await PublishSingleThreaded();

Expand All @@ -82,9 +96,14 @@ public async Task second_run_detect_same_gap_when_stale()
statistics.CurrentMark.ShouldBe(NumberOfEvents - 101);
}

[Fact]
public async Task starting_from_first_detection_some_gaps_with_nonzero_buffer()
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task starting_from_first_detection_some_gaps_with_nonzero_buffer(bool useAdvancedTracking)
{
StoreOptions(opts => opts.Events.EnableAdvancedAsyncTracking = useAdvancedTracking);
await theStore.EnsureStorageExistsAsync(typeof(IEvent));

NumberOfStreams = 10;
await PublishSingleThreaded();

Expand All @@ -104,9 +123,14 @@ public async Task starting_from_first_detection_some_gaps_with_nonzero_buffer()
statistics2.CurrentMark.ShouldBe(NumberOfEvents - 96);
}

[Fact]
public async Task look_for_safe_harbor_time_if_there_are_gaps_between_highest_assigned_event_and_the_sequence()
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task look_for_safe_harbor_time_if_there_are_gaps_between_highest_assigned_event_and_the_sequence(bool useAdvancedTracking)
{
StoreOptions(opts => opts.Events.EnableAdvancedAsyncTracking = useAdvancedTracking);
await theStore.EnsureStorageExistsAsync(typeof(IEvent));

NumberOfStreams = 10;
await PublishSingleThreaded();

Expand All @@ -129,6 +153,12 @@ public async Task look_for_safe_harbor_time_if_there_are_gaps_between_highest_as

// 20 + 20 - 32 = 8
statistics3.CurrentMark.ShouldBe(statistics.CurrentMark + 8);

if (useAdvancedTracking)
{
var skips = await theDetector.FetchLastProgressionSkipsAsync(100, CancellationToken.None);
skips.Any().ShouldBeTrue();
}
}


Expand Down
3 changes: 2 additions & 1 deletion src/DaemonTests/Internals/fetching_events.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Marten.Events.Daemon.Internals;
using Marten.Storage;
using Marten.Testing.Harness;
using NSubstitute;
using Shouldly;
using Weasel.Postgresql.SqlGeneration;
using Xunit;
Expand All @@ -25,7 +26,7 @@ public class fetching_events: OneOffConfigurationsContext, IAsyncLifetime

public fetching_events()
{
theRange = new EventRange(theShardName, 0, 100);
theRange = new EventRange(theShardName, 0, 100, Substitute.For<ISubscriptionAgent>());
}

public Task InitializeAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Marten.Testing;
using Marten.Testing.Documents;
using Marten.Testing.Harness;
using NSubstitute;
using Shouldly;
using Xunit;

Expand Down Expand Up @@ -58,7 +59,7 @@ public async Task update_happy_path()
await theSession.SaveChangesAsync();

var updateProjectionProgress =
new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("three"), 12, 50));
new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("three"), 12, 50, Substitute.For<ISubscriptionAgent>()));

theSession.QueueOperation(updateProjectionProgress);
await theSession.SaveChangesAsync();
Expand All @@ -83,7 +84,7 @@ public async Task Bug_2201_update_successfully_but_have_deletion_next()
await theSession.SaveChangesAsync();

var updateProjectionProgress =
new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("three"), 12, 50));
new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("three"), 12, 50, Substitute.For<ISubscriptionAgent>()));

theSession.QueueOperation(updateProjectionProgress);
theSession.Delete(target);
Expand All @@ -102,7 +103,7 @@ public async Task update_sad_path()
theSession.QueueOperation(insertProjectionProgress);
await theSession.SaveChangesAsync();

var updateProjectionProgress = new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("four"), 5, 50));
var updateProjectionProgress = new UpdateProjectionProgress(theStore.Events, new EventRange(new ShardName("four"), 5, 50, Substitute.For<ISubscriptionAgent>()));

var ex = await Should.ThrowAsync<ProgressionProgressOutOfOrderException>(async () =>
{
Expand Down
140 changes: 140 additions & 0 deletions src/DaemonTests/Resiliency/when_skipping_events_in_daemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
using Marten.Schema;
using Marten.Testing.Harness;
using Marten.Util;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Npgsql;
using Shouldly;
using Xunit;
Expand Down Expand Up @@ -144,6 +146,144 @@ public async Task see_the_dead_letter_events()
}
}

public class when_skipping_events_in_daemon_with_advanced_tracking : DaemonContext
{
private readonly string[] theNames =
{
"Jane",
"Jill",
"Jack",
"JohnBad",
"JakeBad",
"JillBad",
"JohnBad",
"Derrick",
"Daniel",
"Donald",
"DonBad",
"Bob",
"Beck",
"BadName",
"Jeremy"
};

public when_skipping_events_in_daemon_with_advanced_tracking(ITestOutputHelper output) : base(output)
{
StoreOptions(opts =>
{
opts.Events.DatabaseSchemaName = "daemon";
opts.Projections.Add<ErrorRejectingEventProjection>(ProjectionLifecycle.Async);
opts.Projections.Add<CollateNames>(ProjectionLifecycle.Async);

opts.Projections.RebuildErrors.SkipApplyErrors = true;
opts.Projections.Errors.SkipApplyErrors = true;

opts.Events.EnableAdvancedAsyncTracking = true;
});
}

public static void enable_advanced_tracking_in_bootstrapping()
{
#region sample_enabling_advanced_tracking

var builder = Host.CreateApplicationBuilder();
builder.Services.AddMarten(opts =>
{
opts.Connection(builder.Configuration.GetConnectionString("marten"));

opts.Events.EnableAdvancedAsyncTracking = true;
});

#endregion
}

private async Task<IProjectionDaemon> PublishTheEvents()
{
var daemon = await StartDaemon();

var waiter1 = daemon.Tracker.WaitForShardState("CollateNames:All", theNames.Length);
var waiter2 = daemon.Tracker.WaitForShardState("NamedDocuments:All", theNames.Length, 5.Minutes());


var events = theNames.Select(name => new NameEvent {Name = name})
.OfType<object>()
.ToArray();

theSession.Events.StartStream(Guid.NewGuid(), events);
await theSession.SaveChangesAsync();

await daemon.Tracker.WaitForHighWaterMark(theNames.Length);

await waiter1;
await waiter2;

return daemon;
}

[Fact]
public async Task the_shards_should_still_be_running()
{
var daemon = await PublishTheEvents();

var shards = daemon.CurrentAgents();

foreach (var shard in shards)
{
shard.Status.ShouldBe(AgentStatus.Running);
}
}

[Fact]
public async Task skip_bad_events_in_event_projection()
{
await PublishTheEvents();

var names = await theSession.Query<NamedDocument>()
.OrderBy(x => x.Name)
.Select(x => x.Name)
.ToListAsync();

var expected = theNames
.OrderBy(x => x)
.Where(x => !x.Contains("bad", StringComparison.OrdinalIgnoreCase))
.ToArray();

names.ShouldHaveTheSameElementsAs(expected);
}

[Fact]
public async Task skip_bad_events_in_aggregate_projection()
{
await PublishTheEvents();

var jNames = await theSession.LoadAsync<NamesByLetter>("J");

jNames.Names.OrderBy(x => x)
.ShouldHaveTheSameElementsAs("Jack", "Jane", "Jeremy", "Jill");
}

[Fact]
public async Task see_the_dead_letter_events()
{
var daemon = await PublishTheEvents();

// Drain the dead letter events queued up
await daemon.StopAllAsync();

theSession.Logger = new TestOutputMartenLogger(_output);
var skipped = await theSession.Query<DeadLetterEvent>().ToListAsync();

skipped.Where(x => x.ProjectionName == "CollateNames" && x.ShardName == "All")
.Select(x => x.EventSequence).OrderBy(x => x)
.ShouldHaveTheSameElementsAs(4, 5, 6, 7);

skipped.Where(x => x.ProjectionName == "NamedDocuments" && x.ShardName == "All")
.Select(x => x.EventSequence).OrderBy(x => x)
.ShouldHaveTheSameElementsAs(4, 5, 6, 7, 11, 14);

}
}

public class ErrorRejectingEventProjection: EventProjection
{
public ErrorRejectingEventProjection()
Expand Down
6 changes: 6 additions & 0 deletions src/EventSourcingTests/EventGraphTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ public void add_event_directly()
e.EventTypeName.ShouldNotBeNull();
}

[Fact]
public void enable_event_skipping_should_be_disabled_by_default()
{
theGraph.EnableEventSkippingInProjectionsOrSubscriptions.ShouldBeFalse();
}

public class HouseRemodeling
{
public Guid Id { get; set; }
Expand Down
Loading
Loading