Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 109 additions & 34 deletions test/Orleans.DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs
Original file line number Diff line number Diff line change
@@ -1,26 +1,51 @@
#nullable enable
using System.Diagnostics;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.Extensions.Time.Testing;
using Orleans.Configuration;
using Orleans.Hosting;
using Orleans.Internal;
using Orleans.TestingHost;
using TestExtensions;
using UnitTests.GrainInterfaces;
using Xunit;

namespace DefaultCluster.Tests;

public static class AsyncEnumerableGrainCallTestCollection
{
public const string Name = nameof(AsyncEnumerableGrainCallTests);
}

[CollectionDefinition(AsyncEnumerableGrainCallTestCollection.Name)]
public sealed class AsyncEnumerableGrainCallTestCollectionDefinition : ICollectionFixture<AsyncEnumerableGrainCallTests.Fixture>
{
}

/// <summary>
/// Tests support for grain methods which return <see cref="IAsyncEnumerable{T}"/>.
/// These tests verify Orleans' ability to handle streaming results from grain methods,
/// including batching, error handling, cancellation, and proper resource cleanup.
/// Orleans uses a grain extension mechanism to manage the lifecycle of async enumerators
/// across the distributed system.
/// </summary>
public class AsyncEnumerableGrainCallTests : HostedTestClusterEnsureDefaultStarted
[Collection(AsyncEnumerableGrainCallTestCollection.Name)]
public class AsyncEnumerableGrainCallTests
{
public AsyncEnumerableGrainCallTests(DefaultClusterFixture fixture) : base(fixture)
private readonly Fixture _fixture;

public AsyncEnumerableGrainCallTests(Fixture fixture)
{
_fixture = fixture;
}

private IGrainFactory GrainFactory => _fixture.GrainFactory;

private ILogger Logger => _fixture.Logger;

/// <summary>
/// Tests basic async enumerable functionality where a grain produces values that are consumed by the client.
/// Verifies that values are correctly transmitted and the enumerator is properly disposed after use.
Expand Down Expand Up @@ -543,9 +568,8 @@ public async Task ObservableGrain_AsyncEnumerable_SlowProducer()
public async Task ObservableGrain_AsyncEnumerable_SlowConsumer()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var cleanupInterval = TimeSpan.FromMilliseconds(1_000);
var grain = GrainFactory.GetGrain<IObservableGrain>(Guid.NewGuid());
using var listener = new AsyncEnumerableGrainExtensionListener(grain.GetGrainId(), cleanupInterval);
using var listener = new AsyncEnumerableGrainExtensionListener(grain.GetGrainId());

var producer = Task.Run(async () =>
{
Expand All @@ -558,20 +582,11 @@ public async Task ObservableGrain_AsyncEnumerable_SlowConsumer()
});

var values = new List<string>();
var cleanupCountBeforeMoveNext = listener.CleanupCount;
await foreach (var entry in grain.GetValues().WithBatchSize(1))
{
values.Add(entry);

// Wait for one cleanup cycle before reading the next value.
// Track the count captured before the corresponding MoveNext call to avoid waiting
// for a second cycle if cleanup happens right after MoveNext completes.
while (listener.CleanupCount == cleanupCountBeforeMoveNext)
{
await Task.Delay(cleanupInterval / 10, cts.Token);
}

cleanupCountBeforeMoveNext = listener.CleanupCount;
await AdvanceToNextCleanupAsync(listener, cts.Token);
Logger.LogInformation("ObservableGrain_AsyncEnumerable: {Entry}", entry);
}

Expand All @@ -592,9 +607,8 @@ public async Task ObservableGrain_AsyncEnumerable_SlowConsumer()
public async Task ObservableGrain_AsyncEnumerable_SlowConsumer_Evicted()
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var cleanupInterval = TimeSpan.FromMilliseconds(1_000);
var grain = GrainFactory.GetGrain<IObservableGrain>(Guid.NewGuid());
using var listener = new AsyncEnumerableGrainExtensionListener(grain.GetGrainId(), cleanupInterval);
using var listener = new AsyncEnumerableGrainExtensionListener(grain.GetGrainId());

var producer = Task.Run(async () =>
{
Expand All @@ -613,15 +627,10 @@ public async Task ObservableGrain_AsyncEnumerable_SlowConsumer_Evicted()
{
values.Add(entry);

// After the 3rd iteration, sleep for longer than the cleanup duration
// and wait for the enumerator to be cleaned up.
if (values.Count >= 3)
if (values.Count == 3)
{
var initialCleanupCount = listener.CleanupCount;
while (listener.CleanupCount < initialCleanupCount + 2)
{
await Task.Delay(cleanupInterval, cts.Token);
}
await AdvanceToNextCleanupAsync(listener, cts.Token);
await AdvanceToNextCleanupAsync(listener, cts.Token);
}

Logger.LogInformation("ObservableGrain_AsyncEnumerable: {Entry}", entry);
Expand Down Expand Up @@ -675,26 +684,89 @@ await Assert.ThrowsAsync<EnumerationAbortedException>(async () =>
Assert.Equal(2, values.Count);
}

private async Task AdvanceToNextCleanupAsync(AsyncEnumerableGrainExtensionListener listener, CancellationToken cancellationToken)
{
var cleanupCount = listener.CleanupCount + 1;
_fixture.AdvanceTimeByResponseTimeout();
await listener.WaitForCleanupCountAsync(cleanupCount, cancellationToken);
}

/// <summary>
/// Test fixture which uses a fake silo time provider so cleanup timers can be advanced deterministically.
/// </summary>
public sealed class Fixture : BaseInProcessTestClusterFixture
{
private readonly FakeTimeProvider _timeProvider = new(new DateTimeOffset(2026, 1, 1, 0, 0, 0, TimeSpan.Zero));

protected override void ConfigureTestCluster(InProcessTestClusterBuilder builder)
{
builder.ConfigureSilo((_, siloBuilder) =>
{
siloBuilder
.Configure<SiloMessagingOptions>(o => o.ClientGatewayShutdownNotificationTimeout = default)
.UseInMemoryReminderService()
.UseInMemoryDurableJobs()
.AddMemoryGrainStorageAsDefault()
.AddMemoryGrainStorage("MemoryStore");
Comment thread
ReubenBond marked this conversation as resolved.

siloBuilder.Services.Replace(ServiceDescriptor.Singleton<TimeProvider>(_timeProvider));
});
}

public void AdvanceTimeByResponseTimeout() =>
_timeProvider.Advance(HostedCluster.GetSiloServiceProvider().GetRequiredService<IOptions<SiloMessagingOptions>>().Value.ResponseTimeout);
}

/// <summary>
/// Diagnostic listener for monitoring AsyncEnumerableGrainExtension behavior during tests.
/// This helper class allows tests to observe internal cleanup operations and verify
/// that enumerators are properly managed according to their lifecycle requirements.
/// </summary>
private sealed class AsyncEnumerableGrainExtensionListener : IObserver<KeyValuePair<string, object?>>, IObserver<DiagnosticListener>, IDisposable
{
private readonly object _lock = new();
private readonly IDisposable _allListenersSubscription;
private readonly GrainId _targetGrainId;
private readonly TimeSpan _enumeratorCleanupInterval;
private IDisposable? _instanceSubscription;
private TaskCompletionSource? _cleanupCompleted;
private int _cleanupCount;

public AsyncEnumerableGrainExtensionListener(GrainId targetGrainId, TimeSpan enumeratorCleanupInterval)
public AsyncEnumerableGrainExtensionListener(GrainId targetGrainId)
{
_allListenersSubscription = DiagnosticListener.AllListeners.Subscribe(this);
_targetGrainId = targetGrainId;
_enumeratorCleanupInterval = enumeratorCleanupInterval;
}

public int CleanupCount { get; private set; }
public int CleanupCount
{
get
{
lock (_lock)
{
return _cleanupCount;
}
}
}

public async Task WaitForCleanupCountAsync(int cleanupCount, CancellationToken cancellationToken)
{
while (true)
{
Task cleanupCompletedTask;
lock (_lock)
{
if (_cleanupCount >= cleanupCount)
{
return;
}

_cleanupCompleted ??= new(TaskCreationOptions.RunContinuationsAsynchronously);
cleanupCompletedTask = _cleanupCompleted.Task;
}

await cleanupCompletedTask.WaitAsync(cancellationToken);
}
}

void IObserver<KeyValuePair<string, object?>>.OnCompleted()
{
Expand All @@ -713,14 +785,17 @@ public AsyncEnumerableGrainExtensionListener(GrainId targetGrainId, TimeSpan enu
return;
}

if (value.Key == "OnAsyncEnumeratorGrainExtensionCreated")
{
extension.Timer.Change(_enumeratorCleanupInterval, _enumeratorCleanupInterval);
}

if (value.Key == "OnEnumeratorCleanupCompleted")
{
++CleanupCount;
TaskCompletionSource? cleanupCompleted;
lock (_lock)
{
++_cleanupCount;
cleanupCompleted = _cleanupCompleted;
_cleanupCompleted = null;
}

cleanupCompleted?.TrySetResult();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.TimeProvider.Testing" />
<PackageReference Include="NodaTime" />
<PackageReference Include="Microsoft.NET.Test.Sdk" />
</ItemGroup>
Expand Down