Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/Orleans.Core.Abstractions/Runtime/AsyncEnumerableRequest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.ExceptionServices;
using System.Runtime.Serialization;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -43,6 +44,11 @@ public enum EnumerationResult
/// </summary>
MissingEnumeratorError = 1 << 4,

/// <summary>
/// The attempt to enumerate failed because the enumeration threw an exception.
/// </summary>
Error = 1 << 5,

/// <summary>
/// This result indicates that enumeration has completed and that no further results will be produced.
/// </summary>
Expand Down Expand Up @@ -254,6 +260,11 @@ public async ValueTask<bool> MoveNextAsync()
result = await _target.MoveNext<T>(_requestId);
}

if (result.Status is EnumerationResult.Error)
{
ExceptionDispatchInfo.Capture((Exception)result.Value).Throw();
}

if (result.Status is not EnumerationResult.Heartbeat)
{
break;
Expand Down
10 changes: 9 additions & 1 deletion src/Orleans.Core/Runtime/AsyncEnumerableGrainExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private async ValueTask RemoveExpiredAsync(CancellationToken cancellationToken)

// Wait for either the MoveNextAsync task to complete or the cancellation token to be cancelled.
await moveNextTask.WaitAsync(cancellation.Token).SuppressThrowing();
if (moveNextTask is {IsCompletedSuccessfully: true })
if (moveNextTask.IsCompletedSuccessfully)
{
OnMoveNext(requestId);
var hasValue = moveNextTask.GetAwaiter().GetResult();
Expand All @@ -231,6 +231,14 @@ private async ValueTask RemoveExpiredAsync(CancellationToken cancellationToken)
return (EnumerationResult.Completed, default);
}
}
else if (moveNextTask.Exception is { } moveNextException)
{
// Completed, but not successfully.
var exception = moveNextException.InnerExceptions.Count == 1 ? moveNextException.InnerException : moveNextException;
await RemoveEnumeratorAsync(requestId);
await typedEnumerator.DisposeAsync();
return (EnumerationResult.Error, exception);
}

return (EnumerationResult.Heartbeat, default);
}
Expand Down
39 changes: 38 additions & 1 deletion test/DefaultCluster.Tests/AsyncEnumerableGrainCallTests.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging;
using Orleans.Internal;
using Orleans.Runtime;
using TestExtensions;
Expand Down Expand Up @@ -46,6 +46,43 @@ public async Task ObservableGrain_AsyncEnumerable()
Assert.Contains(grainCalls, c => c.InterfaceName.Contains(nameof(IAsyncEnumerableGrainExtension)) && c.MethodName.Contains(nameof(IAsyncDisposable.DisposeAsync)));
}

[Theory, TestCategory("BVT"), TestCategory("Observable")]
[InlineData(0, false)]
[InlineData(0, true)]
[InlineData(1, false)]
[InlineData(1, true)]
[InlineData(9, false)]
[InlineData(9, true)]
[InlineData(10, false)]
[InlineData(10, true)]
[InlineData(11, false)]
[InlineData(11, true)]
public async Task ObservableGrain_AsyncEnumerable_Throws(int errorIndex, bool waitAfterYield)
{
const string ErrorMessage = "This is my error!";
var grain = GrainFactory.GetGrain<IObservableGrain>(Guid.NewGuid());

var values = new List<int>();
try
{
await foreach (var entry in grain.GetValuesWithError(errorIndex, waitAfterYield, ErrorMessage).WithBatchSize(10))
{
values.Add(entry);
Logger.LogInformation("ObservableGrain_AsyncEnumerable: {Entry}", entry);
}
}
catch (InvalidOperationException iox)
{
Assert.Equal(ErrorMessage, iox.Message);
}

Assert.Equal(errorIndex, values.Count);

// Check that the enumerator is disposed
var grainCalls = await grain.GetIncomingCalls();
Assert.Contains(grainCalls, c => c.InterfaceName.Contains(nameof(IAsyncEnumerableGrainExtension)) && c.MethodName.Contains(nameof(IAsyncDisposable.DisposeAsync)));
}

[Fact, TestCategory("BVT"), TestCategory("Observable")]
public async Task ObservableGrain_AsyncEnumerable_Batch()
{
Expand Down
3 changes: 2 additions & 1 deletion test/Grains/TestGrainInterfaces/IObservableGrain.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace UnitTests.GrainInterfaces
namespace UnitTests.GrainInterfaces
{
/// <summary>
/// A grain which returns IAsyncEnumerable
Expand All @@ -10,6 +10,7 @@ public interface IObservableGrain : IGrainWithGuidKey
ValueTask Deactivate();
ValueTask OnNext(string data);
IAsyncEnumerable<string> GetValues();
IAsyncEnumerable<int> GetValuesWithError(int errorIndex, bool waitAfterYield, string errorMessage);

ValueTask<List<(string InterfaceName, string MethodName)>> GetIncomingCalls();
}
Expand Down
17 changes: 16 additions & 1 deletion test/Grains/TestInternalGrains/ObservableGrain.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Threading.Channels;
using System.Threading.Channels;
using UnitTests.GrainInterfaces;

namespace UnitTests.Grains
Expand All @@ -10,6 +10,21 @@ public class ObservableGrain : Grain, IObservableGrain, IIncomingGrainCallFilter

public IAsyncEnumerable<string> GetValues() => _updates.Reader.ReadAllAsync();

public async IAsyncEnumerable<int> GetValuesWithError(int errorIndex, bool waitAfterYield, string errorMessage)
{
await Task.CompletedTask;
for (var i = 0; i < int.MaxValue; i++)
{
if (i == errorIndex)
{
throw new InvalidOperationException(errorMessage);
}

yield return i;
await Task.Yield();
}
}

public ValueTask Complete()
{
_updates.Writer.Complete();
Expand Down
Loading