Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async Task try_to_use_wait_for_non_stale_data_by_aggregate_type()
await theSession.SaveChangesAsync();

var waiter = Task.Run(async () =>
await theStore.Storage.Database.WaitForNonStaleProjectionDataAsync(typeof(SimpleAggregate), 5.Seconds(),
await theStore.Storage.Database.WaitForNonStaleProjectionDataAsync([typeof(SimpleAggregate)], 5.Seconds(),
CancellationToken.None));

using var daemon = await theStore.BuildProjectionDaemonAsync();
Expand Down
28 changes: 28 additions & 0 deletions src/Marten/AdvancedOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,34 @@ public async Task<long> ProjectionProgressFor(ShardName name, string? tenantId =
return await database.ProjectionProgressFor(name, token).ConfigureAwait(false);
}

/// <summary>
/// If you are running the async daemon, this will "wait" for any asynchronous projections building
/// the supplied projected aggregate types to catch up to the current event store high water mark
/// </summary>
/// <param name="cancellation"></param>
/// <param name="timeout"></param>
/// <param name="aggregateTypes"></param>
/// <returns></returns>
public Task WaitForNonStaleDataAsync(CancellationToken cancellation, TimeSpan timeout, params Type[] aggregateTypes)
{
return _store.Tenancy.Default.Database.WaitForNonStaleProjectionDataAsync(aggregateTypes, timeout,
cancellation);
}

/// <summary>
/// If you are running the async daemon, this will "wait" for any asynchronous projections building
/// the supplied projected aggregate types to catch up to the current event store high water mark
/// </summary>
/// <param name="cancellation"></param>
/// <param name="timeout"></param>
/// <param name="tenantId"></param>
/// <param name="aggregateTypes"></param>
public async Task WaitForNonStaleDataAsync(CancellationToken cancellation, TimeSpan timeout, string tenantId, params Type[] aggregateTypes)
{
var tenant = await _store.Tenancy.GetTenantAsync(_store.Options.MaybeCorrectTenantId(tenantId)).ConfigureAwait(false);
await tenant.Database.WaitForNonStaleProjectionDataAsync(aggregateTypes, timeout, cancellation).ConfigureAwait(false);
}

/// <summary>
/// Marten's built in test support for event projections. Only use this in testing as
/// it will delete existing event and projected aggregate data
Expand Down
6 changes: 3 additions & 3 deletions src/Marten/Events/AsyncProjectionTestingExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ private static bool isComplete(this Dictionary<string, long> tracking, long high
return tracking.Values.All(x => x >= highWaterMark);
}

public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase database, Type aggregationType, TimeSpan timeout, CancellationToken token)
public static async Task WaitForNonStaleProjectionDataAsync(this IMartenDatabase database, Type[] aggregationTypes, TimeSpan timeout, CancellationToken token)
{
// Number of active projection shards, plus the high water mark
var shards = database.As<MartenDatabase>().Options.Projections.AsyncShardsPublishingType(aggregationType);
if (!shards.Any()) throw new InvalidOperationException($"Cannot find any registered async projection shards for aggregate type {aggregationType.FullNameInCode()}");
var shards = aggregationTypes.SelectMany(x => database.As<MartenDatabase>().Options.Projections.AsyncShardsPublishingType(x)).ToArray() ;
if (!shards.Any()) throw new InvalidOperationException($"Cannot find any registered async projection shards for aggregate type {aggregationTypes.Select(x => x.FullNameInCode()).Join(", ")}");

var tracking = new Dictionary<string, long>();
foreach (var shard in shards)
Expand Down
2 changes: 1 addition & 1 deletion src/Marten/Linq/MartenLinqQueryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ internal async ValueTask EnsureStorageExistsAsync(LinqQueryParser parser,

if (Waiter != null)
{
await _session.Database.WaitForNonStaleProjectionDataAsync(SourceType, Waiter.Timeout, cancellationToken).ConfigureAwait(false);
await _session.Database.WaitForNonStaleProjectionDataAsync([SourceType], Waiter.Timeout, cancellationToken).ConfigureAwait(false);
}
}

Expand Down
Loading