diff --git a/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs b/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs index 768dd2507b..b4e09b9d8d 100644 --- a/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs +++ b/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs @@ -129,6 +129,15 @@ public async Task try_to_query_for_non_stale_data_by_aggregate_type() await task; } + [Fact] + public async Task cleanly_query_with_no_data() + { + await theStore.Advanced.ResetAllData(); + + var items = await theSession.QueryForNonStaleData(3.Seconds()).ToListAsync(); + items.Count.ShouldBe(0); + } + public static async Task ExampleUsage() { #region sample_using_query_for_non_stale_data diff --git a/src/Marten/Events/AsyncProjectionTestingExtensions.cs b/src/Marten/Events/AsyncProjectionTestingExtensions.cs index 824590905c..c0be77e7e9 100644 --- a/src/Marten/Events/AsyncProjectionTestingExtensions.cs +++ b/src/Marten/Events/AsyncProjectionTestingExtensions.cs @@ -78,6 +78,9 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IDocumentStore /// /// Wait for any running async daemon to catch up to the latest event sequence at the time + /// this is invoked. *Note*, this method was intended for test automation and will wait + /// until there is any event data. If this is not what you intended, use WaitForNonStaleQueryableDataAsync() + /// instead /// /// /// Will be awaited till all shards have been started before checking if they've caught up @@ -110,11 +113,46 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase await Task.Delay(100.Milliseconds(), cancellationSource.Token).ConfigureAwait(false); } while (true); - if (initial.EventSequenceNumber == 0) + if (initial.EventSequenceNumber == 0 || initial.EventCount == 0) { throw new TimeoutException("No event activity was detected within the timeout span"); } + await database.WaitForNonStaleDataAsync(cancellationSource, projectionsCount, initial).ConfigureAwait(false); + } + + /// + /// Wait for all projections and subscriptions to catch up to the high water mark at the point this is called. + /// This method will cleanly exit if there is no event data upfront + /// + /// + /// + /// + public static async Task WaitForNonStaleQueryableDataAsync(this IMartenDatabase database, TimeSpan timeout, CancellationToken cancellation) + { + // Number of active projection shards, plus the high water mark + var projectionsCount = database.As().Options.Projections.AllShards().Count + 1; + + // Just get out of there if there are no projections + if (projectionsCount == 1) + { + return; + } + + var initial = await database.FetchEventStoreStatistics(cancellation).ConfigureAwait(false); + + // No data, get out of here + if (initial.EventCount == 0 || initial.EventSequenceNumber == 1) return; + + using var cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(timeout); + + await database.WaitForNonStaleDataAsync(cancellationSource, projectionsCount, initial).ConfigureAwait(false); + } + + public static async Task WaitForNonStaleDataAsync(this IMartenDatabase database, CancellationTokenSource cancellationSource, + int projectionsCount, EventStoreStatistics initial) + { IReadOnlyList projections = []; try { @@ -193,6 +231,7 @@ public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase } var highWaterMark = await database.FetchHighestEventSequenceNumber(token).ConfigureAwait(false); + if (highWaterMark <= 1) return; if (tracking.isComplete(highWaterMark)) return;