Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/DaemonTests/Internals/HighWaterAgentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,32 @@ public async Task will_not_go_in_loop_when_sequence_is_advanced_but_gaps_from_hi
await agent2.Tracker.WaitForHighWaterMark(NumberOfEvents + 5);
}

[Fact]
public async Task skips_multiple_gaps_and_keeps_advancing()
{
NumberOfStreams = 10;
await PublishSingleThreaded();

// Create multiple gaps in the event sequence
var gaps = new[] { NumberOfEvents - 9, NumberOfEvents - 6, NumberOfEvents - 5, NumberOfEvents - 3 };
await deleteEvents(gaps);

// Make gap skipping kick in faster
theStore.Options.Projections.StaleSequenceThreshold = 1.Seconds();

using var agent = await StartDaemon();

// Expect the agent to move beyond each gap by reporting the last contiguous (safe harbor) mark
await agent.Tracker.WaitForHighWaterMark(gaps[0], 2.Seconds());
await agent.Tracker.WaitForHighWaterMark(gaps[1], 2.Seconds());
await agent.Tracker.WaitForHighWaterMark(gaps[2], 2.Seconds());
await agent.Tracker.WaitForHighWaterMark(gaps[3], 2.Seconds());
// And eventually reach the head
await agent.Tracker.WaitForHighWaterMark(NumberOfEvents, 2.Seconds());

await agent.StopAllAsync();
}

private async Task deleteEvents(params long[] ids)
{
await using var conn = theStore.CreateConnection();
Expand Down