diff --git a/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs b/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs index a56bb21840..b59c8f3922 100644 --- a/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs +++ b/src/EventSourcingTests/Aggregation/querying_with_non_stale_data.cs @@ -81,7 +81,7 @@ public async Task try_to_use_wait_for_non_stale_data_by_aggregate_type() await theSession.SaveChangesAsync(); var waiter = Task.Run(async () => - await theStore.Storage.Database.WaitForNonStaleProjectionDataAsync(typeof(SimpleAggregate), 5.Seconds(), + await theStore.Storage.Database.WaitForNonStaleProjectionDataAsync([typeof(SimpleAggregate)], 5.Seconds(), CancellationToken.None)); using var daemon = await theStore.BuildProjectionDaemonAsync(); diff --git a/src/Marten/AdvancedOperations.cs b/src/Marten/AdvancedOperations.cs index 7674ebde4c..37f12c2700 100644 --- a/src/Marten/AdvancedOperations.cs +++ b/src/Marten/AdvancedOperations.cs @@ -165,6 +165,34 @@ public async Task ProjectionProgressFor(ShardName name, string? tenantId = return await database.ProjectionProgressFor(name, token).ConfigureAwait(false); } + /// + /// If you are running the async daemon, this will "wait" for any asynchronous projections building + /// the supplied projected aggregate types to catch up to the current event store high water mark + /// + /// + /// + /// + /// + public Task WaitForNonStaleDataAsync(CancellationToken cancellation, TimeSpan timeout, params Type[] aggregateTypes) + { + return _store.Tenancy.Default.Database.WaitForNonStaleProjectionDataAsync(aggregateTypes, timeout, + cancellation); + } + + /// + /// If you are running the async daemon, this will "wait" for any asynchronous projections building + /// the supplied projected aggregate types to catch up to the current event store high water mark + /// + /// + /// + /// + /// + public async Task WaitForNonStaleDataAsync(CancellationToken cancellation, TimeSpan timeout, string tenantId, params Type[] aggregateTypes) + { + var tenant = await _store.Tenancy.GetTenantAsync(_store.Options.MaybeCorrectTenantId(tenantId)).ConfigureAwait(false); + await tenant.Database.WaitForNonStaleProjectionDataAsync(aggregateTypes, timeout, cancellation).ConfigureAwait(false); + } + /// /// Marten's built in test support for event projections. Only use this in testing as /// it will delete existing event and projected aggregate data diff --git a/src/Marten/Events/AsyncProjectionTestingExtensions.cs b/src/Marten/Events/AsyncProjectionTestingExtensions.cs index 406d5cfa05..4e5c4df6ae 100644 --- a/src/Marten/Events/AsyncProjectionTestingExtensions.cs +++ b/src/Marten/Events/AsyncProjectionTestingExtensions.cs @@ -136,11 +136,11 @@ private static bool isComplete(this Dictionary tracking, long high return tracking.Values.All(x => x >= highWaterMark); } - public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase database, Type aggregationType, TimeSpan timeout, CancellationToken token) + public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase database, Type[] aggregationTypes, TimeSpan timeout, CancellationToken token) { // Number of active projection shards, plus the high water mark - var shards = database.As().Options.Projections.AsyncShardsPublishingType(aggregationType); - if (!shards.Any()) throw new InvalidOperationException($"Cannot find any registered async projection shards for aggregate type {aggregationType.FullNameInCode()}"); + var shards = aggregationTypes.SelectMany(x => database.As().Options.Projections.AsyncShardsPublishingType(x)).ToArray() ; + if (!shards.Any()) throw new InvalidOperationException($"Cannot find any registered async projection shards for aggregate type {aggregationTypes.Select(x => x.FullNameInCode()).Join(", ")}"); var tracking = new Dictionary(); foreach (var shard in shards) diff --git a/src/Marten/Linq/MartenLinqQueryProvider.cs b/src/Marten/Linq/MartenLinqQueryProvider.cs index 5cc61de885..554183144f 100644 --- a/src/Marten/Linq/MartenLinqQueryProvider.cs +++ b/src/Marten/Linq/MartenLinqQueryProvider.cs @@ -78,7 +78,7 @@ internal async ValueTask EnsureStorageExistsAsync(LinqQueryParser parser, if (Waiter != null) { - await _session.Database.WaitForNonStaleProjectionDataAsync(SourceType, Waiter.Timeout, cancellationToken).ConfigureAwait(false); + await _session.Database.WaitForNonStaleProjectionDataAsync([SourceType], Waiter.Timeout, cancellationToken).ConfigureAwait(false); } }