Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ public async Task try_to_query_for_non_stale_data_by_aggregate_type()
await task;
}

[Fact]
public async Task cleanly_query_with_no_data()
{
await theStore.Advanced.ResetAllData();

var items = await theSession.QueryForNonStaleData<SimpleAggregate>(3.Seconds()).ToListAsync();
items.Count.ShouldBe(0);
}

public static async Task ExampleUsage()
{
#region sample_using_query_for_non_stale_data
Expand Down
41 changes: 40 additions & 1 deletion src/Marten/Events/AsyncProjectionTestingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IDocumentStore

/// <summary>
/// Wait for any running async daemon to catch up to the latest event sequence at the time
/// this is invoked. *Note*, this method was intended for test automation and will wait
/// until there is any event data. If this is not what you intended, use WaitForNonStaleQueryableDataAsync()
/// instead
/// </summary>
/// <param name="projectionsCount">
/// Will be awaited till all shards have been started before checking if they've caught up
Expand Down Expand Up @@ -110,11 +113,46 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase
await Task.Delay(100.Milliseconds(), cancellationSource.Token).ConfigureAwait(false);
} while (true);

if (initial.EventSequenceNumber == 0)
if (initial.EventSequenceNumber == 0 || initial.EventCount == 0)
{
throw new TimeoutException("No event activity was detected within the timeout span");
}

await database.WaitForNonStaleDataAsync(cancellationSource, projectionsCount, initial).ConfigureAwait(false);
}

/// <summary>
/// Wait for all projections and subscriptions to catch up to the high water mark at the point this is called.
/// This method will cleanly exit if there is no event data upfront
/// </summary>
/// <param name="database"></param>
/// <param name="timeout"></param>
/// <param name="cancellation"></param>
public static async Task WaitForNonStaleQueryableDataAsync(this IMartenDatabase database, TimeSpan timeout, CancellationToken cancellation)
{
// Number of active projection shards, plus the high water mark
var projectionsCount = database.As<MartenDatabase>().Options.Projections.AllShards().Count + 1;

// Just get out of there if there are no projections
if (projectionsCount == 1)
{
return;
}

var initial = await database.FetchEventStoreStatistics(cancellation).ConfigureAwait(false);

// No data, get out of here
if (initial.EventCount == 0 || initial.EventSequenceNumber == 1) return;

using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(timeout);

await database.WaitForNonStaleDataAsync(cancellationSource, projectionsCount, initial).ConfigureAwait(false);
}

public static async Task WaitForNonStaleDataAsync(this IMartenDatabase database, CancellationTokenSource cancellationSource,
int projectionsCount, EventStoreStatistics initial)
{
IReadOnlyList<ShardState> projections = [];
try
{
Expand Down Expand Up @@ -193,6 +231,7 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase
}

var highWaterMark = await database.FetchHighestEventSequenceNumber(token).ConfigureAwait(false);
if (highWaterMark <= 1) return;

if (tracking.isComplete(highWaterMark)) return;

Expand Down
Loading