Skip to content

Commit 4960397

Browse files
authored
IAsyncEnumerable: Fix cancellation propagation, bound operation times, use mark-and-sweep cleanup (#9387)
IAsyncEnumerable: bound polling, cancellation, and disposal time
1 parent f2ad524 commit 4960397

File tree

6 files changed

+357
-84
lines changed

6 files changed

+357
-84
lines changed

src/Orleans.Core.Abstractions/Runtime/AsyncEnumerableRequest.cs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ public enum EnumerationResult
4949
/// </summary>
5050
Error = 1 << 5,
5151

52+
/// <summary>
53+
/// Enumeration was canceled.
54+
/// </summary>
55+
Canceled = 1 << 6,
56+
5257
/// <summary>
5358
/// This result indicates that enumeration has completed and that no further results will be produced.
5459
/// </summary>
@@ -244,26 +249,26 @@ public async ValueTask<bool> MoveNextAsync()
244249
(EnumerationResult Status, object Value) result;
245250
while (true)
246251
{
247-
if (_cancellationToken.IsCancellationRequested)
248-
{
249-
_current = default;
250-
return false;
251-
}
252+
_cancellationToken.ThrowIfCancellationRequested();
252253

253254
if (!_initialized)
254255
{
255-
result = await _target.StartEnumeration(_requestId, _request);
256+
result = await _target.StartEnumeration(_requestId, _request).AsTask().WaitAsync(_cancellationToken);
256257
_initialized = true;
257258
}
258259
else
259260
{
260-
result = await _target.MoveNext<T>(_requestId);
261+
result = await _target.MoveNext<T>(_requestId).AsTask().WaitAsync(_cancellationToken);
261262
}
262263

263264
if (result.Status is EnumerationResult.Error)
264265
{
265266
ExceptionDispatchInfo.Capture((Exception)result.Value).Throw();
266267
}
268+
else if (result.Status is EnumerationResult.Canceled)
269+
{
270+
throw new OperationCanceledException();
271+
}
267272

268273
if (result.Status is not EnumerationResult.Heartbeat)
269274
{
@@ -274,7 +279,7 @@ public async ValueTask<bool> MoveNextAsync()
274279
if (result.Status is EnumerationResult.MissingEnumeratorError)
275280
{
276281
throw new EnumerationAbortedException("Enumeration aborted: the remote target does not have a record of this enumerator."
277-
+ " This likely indicates that the remote grain was deactivated since enumeration begun.");
282+
+ " This likely indicates that the remote grain was deactivated since enumeration begun or that the enumerator was idle for longer than the expiration period.");
278283
}
279284

280285
Debug.Assert((result.Status & (EnumerationResult.Element | EnumerationResult.Batch | EnumerationResult.Completed)) != 0);

src/Orleans.Core/Configuration/Options/MessagingOptions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,5 +55,10 @@ public TimeSpan ResponseTimeout
5555
/// </summary>
5656
/// <value>The maximum message body size is 100 MB by default.</value>
5757
public int MaxMessageBodySize { get; set; } = 100 * 1024 * 1024;
58+
59+
/// <summary>
60+
/// Gets the response timeout underlying the <see cref="ResponseTimeout"/> property, without debugger checks.
61+
/// </summary>
62+
internal TimeSpan ConfiguredResponseTimeout => _responseTimeout;
5863
}
5964
}

0 commit comments

Comments
 (0)