From 0759ce6dcd73762d1c7330ff79954250732cdfc4 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Thu, 14 May 2026 11:03:40 -0700 Subject: [PATCH 1/2] Stabilize async enumerable cleanup tests Use a FakeTimeProvider-backed fixture for AsyncEnumerableGrainCallTests so slow-consumer cleanup can be advanced deterministically instead of relying on wall-clock polling. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../AsyncEnumerableGrainCallTests.cs | 158 ++++++++++++++---- .../Orleans.DefaultCluster.Tests.csproj | 1 + 2 files changed, 125 insertions(+), 34 deletions(-) diff --git a/test/Orleans.DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs b/test/Orleans.DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs index a66d842df6d..429b20082bc 100644 --- a/test/Orleans.DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs +++ b/test/Orleans.DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs @@ -1,13 +1,30 @@ #nullable enable using System.Diagnostics; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Microsoft.Extensions.Time.Testing; +using Orleans.Configuration; +using Orleans.Hosting; using Orleans.Internal; +using Orleans.TestingHost; using TestExtensions; using UnitTests.GrainInterfaces; using Xunit; namespace DefaultCluster.Tests; +public static class AsyncEnumerableGrainCallTestCollection +{ + public const string Name = nameof(AsyncEnumerableGrainCallTests); +} + +[CollectionDefinition(AsyncEnumerableGrainCallTestCollection.Name)] +public sealed class AsyncEnumerableGrainCallTestCollectionDefinition : ICollectionFixture +{ +} + /// /// Tests support for grain methods which return . /// These tests verify Orleans' ability to handle streaming results from grain methods, @@ -15,12 +32,20 @@ namespace DefaultCluster.Tests; /// Orleans uses a grain extension mechanism to manage the lifecycle of async enumerators /// across the distributed system. /// -public class AsyncEnumerableGrainCallTests : HostedTestClusterEnsureDefaultStarted +[Collection(AsyncEnumerableGrainCallTestCollection.Name)] +public class AsyncEnumerableGrainCallTests { - public AsyncEnumerableGrainCallTests(DefaultClusterFixture fixture) : base(fixture) + private readonly Fixture _fixture; + + public AsyncEnumerableGrainCallTests(Fixture fixture) { + _fixture = fixture; } + private IGrainFactory GrainFactory => _fixture.GrainFactory; + + private ILogger Logger => _fixture.Logger; + /// /// Tests basic async enumerable functionality where a grain produces values that are consumed by the client. /// Verifies that values are correctly transmitted and the enumerator is properly disposed after use. @@ -543,9 +568,8 @@ public async Task ObservableGrain_AsyncEnumerable_SlowProducer() public async Task ObservableGrain_AsyncEnumerable_SlowConsumer() { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - var cleanupInterval = TimeSpan.FromMilliseconds(1_000); var grain = GrainFactory.GetGrain(Guid.NewGuid()); - using var listener = new AsyncEnumerableGrainExtensionListener(grain.GetGrainId(), cleanupInterval); + using var listener = new AsyncEnumerableGrainExtensionListener(grain.GetGrainId()); var producer = Task.Run(async () => { @@ -558,20 +582,11 @@ public async Task ObservableGrain_AsyncEnumerable_SlowConsumer() }); var values = new List(); - var cleanupCountBeforeMoveNext = listener.CleanupCount; await foreach (var entry in grain.GetValues().WithBatchSize(1)) { values.Add(entry); - // Wait for one cleanup cycle before reading the next value. - // Track the count captured before the corresponding MoveNext call to avoid waiting - // for a second cycle if cleanup happens right after MoveNext completes. - while (listener.CleanupCount == cleanupCountBeforeMoveNext) - { - await Task.Delay(cleanupInterval / 10, cts.Token); - } - - cleanupCountBeforeMoveNext = listener.CleanupCount; + await AdvanceToNextCleanupAsync(listener, cts.Token); Logger.LogInformation("ObservableGrain_AsyncEnumerable: {Entry}", entry); } @@ -592,9 +607,8 @@ public async Task ObservableGrain_AsyncEnumerable_SlowConsumer() public async Task ObservableGrain_AsyncEnumerable_SlowConsumer_Evicted() { using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); - var cleanupInterval = TimeSpan.FromMilliseconds(1_000); var grain = GrainFactory.GetGrain(Guid.NewGuid()); - using var listener = new AsyncEnumerableGrainExtensionListener(grain.GetGrainId(), cleanupInterval); + using var listener = new AsyncEnumerableGrainExtensionListener(grain.GetGrainId()); var producer = Task.Run(async () => { @@ -613,15 +627,10 @@ public async Task ObservableGrain_AsyncEnumerable_SlowConsumer_Evicted() { values.Add(entry); - // After the 3rd iteration, sleep for longer than the cleanup duration - // and wait for the enumerator to be cleaned up. - if (values.Count >= 3) + if (values.Count == 3) { - var initialCleanupCount = listener.CleanupCount; - while (listener.CleanupCount < initialCleanupCount + 2) - { - await Task.Delay(cleanupInterval, cts.Token); - } + await AdvanceToNextCleanupAsync(listener, cts.Token); + await AdvanceToNextCleanupAsync(listener, cts.Token); } Logger.LogInformation("ObservableGrain_AsyncEnumerable: {Entry}", entry); @@ -675,6 +684,54 @@ await Assert.ThrowsAsync(async () => Assert.Equal(2, values.Count); } + private async Task AdvanceToNextCleanupAsync(AsyncEnumerableGrainExtensionListener listener, CancellationToken cancellationToken) + { + var cleanupCount = listener.CleanupCount + 1; + _fixture.AdvanceTimeByResponseTimeout(); + await listener.WaitForCleanupCountAsync(cleanupCount, cancellationToken); + } + + /// + /// Test fixture which uses a fake silo time provider so cleanup timers can be advanced deterministically. + /// + public sealed class Fixture : BaseTestClusterFixture + { + private readonly FakeTimeProvider _timeProvider = new(new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero)); + + protected override void ConfigureTestCluster(TestClusterBuilder builder) + { + TimeProviderSiloConfigurator.TimeProvider = _timeProvider; + builder.AddSiloBuilderConfigurator(); + builder.AddSiloBuilderConfigurator(); + } + + public void AdvanceTimeByResponseTimeout() => + _timeProvider.Advance(((InProcessSiloHandle)HostedCluster.Primary).ServiceProvider.GetRequiredService>().Value.ResponseTimeout); + + public override async Task DisposeAsync() + { + try + { + await base.DisposeAsync(); + } + finally + { + TimeProviderSiloConfigurator.TimeProvider = null; + } + } + + private sealed class TimeProviderSiloConfigurator : ISiloConfigurator + { + public static FakeTimeProvider? TimeProvider { get; set; } + + public void Configure(ISiloBuilder siloBuilder) + { + var timeProvider = TimeProvider ?? throw new InvalidOperationException("The fake time provider has not been configured."); + siloBuilder.Services.Replace(ServiceDescriptor.Singleton(timeProvider)); + } + } + } + /// /// Diagnostic listener for monitoring AsyncEnumerableGrainExtension behavior during tests. /// This helper class allows tests to observe internal cleanup operations and verify @@ -682,19 +739,49 @@ await Assert.ThrowsAsync(async () => /// private sealed class AsyncEnumerableGrainExtensionListener : IObserver>, IObserver, IDisposable { + private readonly object _lock = new(); private readonly IDisposable _allListenersSubscription; private readonly GrainId _targetGrainId; - private readonly TimeSpan _enumeratorCleanupInterval; private IDisposable? _instanceSubscription; + private TaskCompletionSource? _cleanupCompleted; + private int _cleanupCount; - public AsyncEnumerableGrainExtensionListener(GrainId targetGrainId, TimeSpan enumeratorCleanupInterval) + public AsyncEnumerableGrainExtensionListener(GrainId targetGrainId) { _allListenersSubscription = DiagnosticListener.AllListeners.Subscribe(this); _targetGrainId = targetGrainId; - _enumeratorCleanupInterval = enumeratorCleanupInterval; } - public int CleanupCount { get; private set; } + public int CleanupCount + { + get + { + lock (_lock) + { + return _cleanupCount; + } + } + } + + public async Task WaitForCleanupCountAsync(int cleanupCount, CancellationToken cancellationToken) + { + while (true) + { + Task cleanupCompletedTask; + lock (_lock) + { + if (_cleanupCount >= cleanupCount) + { + return; + } + + _cleanupCompleted ??= new(TaskCreationOptions.RunContinuationsAsynchronously); + cleanupCompletedTask = _cleanupCompleted.Task; + } + + await cleanupCompletedTask.WaitAsync(cancellationToken); + } + } void IObserver>.OnCompleted() { @@ -713,14 +800,17 @@ public AsyncEnumerableGrainExtensionListener(GrainId targetGrainId, TimeSpan enu return; } - if (value.Key == "OnAsyncEnumeratorGrainExtensionCreated") - { - extension.Timer.Change(_enumeratorCleanupInterval, _enumeratorCleanupInterval); - } - if (value.Key == "OnEnumeratorCleanupCompleted") { - ++CleanupCount; + TaskCompletionSource? cleanupCompleted; + lock (_lock) + { + ++_cleanupCount; + cleanupCompleted = _cleanupCompleted; + _cleanupCompleted = null; + } + + cleanupCompleted?.TrySetResult(); } } diff --git a/test/Orleans.DefaultCluster.Tests/Orleans.DefaultCluster.Tests.csproj b/test/Orleans.DefaultCluster.Tests/Orleans.DefaultCluster.Tests.csproj index d3c0b97a65c..ea825ec81bf 100644 --- a/test/Orleans.DefaultCluster.Tests/Orleans.DefaultCluster.Tests.csproj +++ b/test/Orleans.DefaultCluster.Tests/Orleans.DefaultCluster.Tests.csproj @@ -5,6 +5,7 @@ + From ff4bc5d2693c59c8061e4f1df93bb9c5fad2b151 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Thu, 14 May 2026 11:09:29 -0700 Subject: [PATCH 2/2] Use in-process cluster for async enumerable tests Switch the fake-time async enumerable test fixture from TestClusterBuilder to InProcessTestClusterBuilder while keeping deterministic cleanup advancement. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../AsyncEnumerableGrainCallTests.cs | 43 ++++++------------- 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/test/Orleans.DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs b/test/Orleans.DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs index 429b20082bc..f84df977ba4 100644 --- a/test/Orleans.DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs +++ b/test/Orleans.DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs @@ -694,42 +694,27 @@ private async Task AdvanceToNextCleanupAsync(AsyncEnumerableGrainExtensionListen /// /// Test fixture which uses a fake silo time provider so cleanup timers can be advanced deterministically. /// - public sealed class Fixture : BaseTestClusterFixture + public sealed class Fixture : BaseInProcessTestClusterFixture { private readonly FakeTimeProvider _timeProvider = new(new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero)); - protected override void ConfigureTestCluster(TestClusterBuilder builder) + protected override void ConfigureTestCluster(InProcessTestClusterBuilder builder) { - TimeProviderSiloConfigurator.TimeProvider = _timeProvider; - builder.AddSiloBuilderConfigurator(); - builder.AddSiloBuilderConfigurator(); - } - - public void AdvanceTimeByResponseTimeout() => - _timeProvider.Advance(((InProcessSiloHandle)HostedCluster.Primary).ServiceProvider.GetRequiredService>().Value.ResponseTimeout); - - public override async Task DisposeAsync() - { - try - { - await base.DisposeAsync(); - } - finally + builder.ConfigureSilo((_, siloBuilder) => { - TimeProviderSiloConfigurator.TimeProvider = null; - } - } - - private sealed class TimeProviderSiloConfigurator : ISiloConfigurator - { - public static FakeTimeProvider? TimeProvider { get; set; } + siloBuilder + .Configure(o => o.ClientGatewayShutdownNotificationTimeout = default) + .UseInMemoryReminderService() + .UseInMemoryDurableJobs() + .AddMemoryGrainStorageAsDefault() + .AddMemoryGrainStorage("MemoryStore"); - public void Configure(ISiloBuilder siloBuilder) - { - var timeProvider = TimeProvider ?? throw new InvalidOperationException("The fake time provider has not been configured."); - siloBuilder.Services.Replace(ServiceDescriptor.Singleton(timeProvider)); - } + siloBuilder.Services.Replace(ServiceDescriptor.Singleton(_timeProvider)); + }); } + + public void AdvanceTimeByResponseTimeout() => + _timeProvider.Advance(HostedCluster.GetSiloServiceProvider().GetRequiredService>().Value.ResponseTimeout); } ///