From 3de1a53c5eb509a71ba3b9160d6b052c633e1c47 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Wed, 26 Feb 2025 11:10:17 -0800 Subject: [PATCH] Align AsyncEnumerable cancellation semantics to .NET expectations --- src/Orleans.Core/Utils/AsyncEnumerable.cs | 28 ++++++------------- .../KubernetesClusterAgent.cs | 5 +++- .../GrainDirectory/CachedGrainLocator.cs | 4 ++- .../GrainDirectory/ClientDirectory.cs | 3 +- .../MembershipService/ClusterHealthMonitor.cs | 4 +++ .../ClusterMembershipService.cs | 4 +++ .../SiloStatusListenerManager.cs | 4 +++ .../QueueBalancer/QueueBalancerBase.cs | 3 +- 8 files changed, 31 insertions(+), 24 deletions(-) diff --git a/src/Orleans.Core/Utils/AsyncEnumerable.cs b/src/Orleans.Core/Utils/AsyncEnumerable.cs index 0eb2a02ab55..3e9eb01e843 100644 --- a/src/Orleans.Core/Utils/AsyncEnumerable.cs +++ b/src/Orleans.Core/Utils/AsyncEnumerable.cs @@ -3,6 +3,7 @@ using System.Diagnostics.CodeAnalysis; using System.Threading; using System.Threading.Tasks; +using Orleans.Internal; namespace Orleans.Runtime.Utilities { @@ -136,12 +137,12 @@ private enum PublishResult private sealed class AsyncEnumerator : IAsyncEnumerator { - private readonly TaskCompletionSource _cancellation = new(TaskCreationOptions.RunContinuationsAsynchronously); - private readonly CancellationTokenRegistration _registration; + private readonly CancellationTokenSource _cts; private Element _current; - public AsyncEnumerator(Element initial, CancellationToken cancellation) + public AsyncEnumerator(Element initial, CancellationToken cancellationToken) { + _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); if (!initial.IsValid) { _current = initial; @@ -152,38 +153,25 @@ public AsyncEnumerator(Element initial, CancellationToken cancellation) result.SetNext(initial); _current = result; } - - if (cancellation.CanBeCanceled) - { - _registration = cancellation.Register(() => _cancellation.TrySetResult()); - } } T IAsyncEnumerator.Current => _current.Value; async ValueTask IAsyncEnumerator.MoveNextAsync() { - if (_current.IsDisposed || _cancellation.Task.IsCompleted) - { - return false; - } - - var next = _current.NextAsync(); - var cancellationTask = _cancellation.Task; - var result = await Task.WhenAny(cancellationTask, next); - if (ReferenceEquals(result, cancellationTask)) + if (_current.IsDisposed) { return false; } - _current = await next; + _current = await _current.NextAsync().WaitAsync(_cts.Token); return _current.IsValid; } async ValueTask IAsyncDisposable.DisposeAsync() { - _cancellation.TrySetResult(); - await _registration.DisposeAsync(); + await _cts.CancelAsync().SuppressThrowing(); + _cts.Dispose(); } } diff --git a/src/Orleans.Hosting.Kubernetes/KubernetesClusterAgent.cs b/src/Orleans.Hosting.Kubernetes/KubernetesClusterAgent.cs index 8bfabe2086f..9ac04ceb566 100644 --- a/src/Orleans.Hosting.Kubernetes/KubernetesClusterAgent.cs +++ b/src/Orleans.Hosting.Kubernetes/KubernetesClusterAgent.cs @@ -271,7 +271,10 @@ private async Task MonitorOrleansClustering() previous = update; } } - catch (Exception exception) when (!(_shutdownToken.IsCancellationRequested && (exception is TaskCanceledException || exception is OperationCanceledException))) + catch (OperationCanceledException) when (_shutdownToken.IsCancellationRequested) + { + } + catch (Exception exception) { if (_logger.IsEnabled(LogLevel.Debug)) { diff --git a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs index e638c0c3e5b..11c788501fa 100644 --- a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs +++ b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs @@ -137,7 +137,9 @@ async Task OnStop(CancellationToken ct) { this.shutdownToken.Cancel(); if (listenToClusterChangeTask != default && !ct.IsCancellationRequested) - await listenToClusterChangeTask.WaitAsync(ct); + { + await listenToClusterChangeTask.WaitAsync(ct).SuppressThrowing(); + } }; lifecycle.Subscribe(nameof(CachedGrainLocator), ServiceLifecycleStage.RuntimeGrainServices, OnStart, OnStop); } diff --git a/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs b/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs index 64d41f0f521..a882a7ce301 100644 --- a/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs @@ -394,6 +394,7 @@ private async Task Run() catch (OperationCanceledException) when (_shutdownCts.IsCancellationRequested) { // Ignore during shutdown. + break; } catch (Exception exception) { @@ -492,7 +493,7 @@ private async Task PublishUpdates() { // The target has already seen the latest version for this silo. builder.Remove(silo); - } + } } } diff --git a/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs b/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs index 8ddaf7f2661..90877778434 100644 --- a/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs +++ b/src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs @@ -101,6 +101,10 @@ private async Task ProcessMembershipUpdates() this.observedMembershipVersion = tableSnapshot.Version; } } + catch (OperationCanceledException) when (shutdownCancellation.IsCancellationRequested) + { + // Ignore and continue shutting down. + } catch (Exception exception) when (this.fatalErrorHandler.IsUnexpected(exception)) { this.fatalErrorHandler.OnFatalException(this, nameof(ProcessMembershipUpdates), exception); diff --git a/src/Orleans.Runtime/MembershipService/ClusterMembershipService.cs b/src/Orleans.Runtime/MembershipService/ClusterMembershipService.cs index db926a662f7..11995b94264 100644 --- a/src/Orleans.Runtime/MembershipService/ClusterMembershipService.cs +++ b/src/Orleans.Runtime/MembershipService/ClusterMembershipService.cs @@ -87,6 +87,10 @@ private async Task ProcessMembershipUpdates(CancellationToken ct) this.updates.TryPublish(tableSnapshot.CreateClusterMembershipSnapshot()); } } + catch (OperationCanceledException) when (ct.IsCancellationRequested) + { + // Ignore and continue shutting down. + } catch (Exception exception) when (this.fatalErrorHandler.IsUnexpected(exception)) { this.log.LogError(exception, "Error processing membership updates"); diff --git a/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs b/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs index a10efa49b89..cb7c362fc30 100644 --- a/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs +++ b/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs @@ -87,6 +87,10 @@ private async Task ProcessMembershipUpdates() previous = snapshot; } } + catch (OperationCanceledException) when (_cancellation.IsCancellationRequested) + { + // Ignore and continue shutting down. + } catch (Exception exception) when (_fatalErrorHandler.IsUnexpected(exception)) { _logger.LogError(exception, "Error processing membership updates."); diff --git a/src/Orleans.Streaming/QueueBalancer/QueueBalancerBase.cs b/src/Orleans.Streaming/QueueBalancer/QueueBalancerBase.cs index 3eccd6c9123..888c911c1d9 100644 --- a/src/Orleans.Streaming/QueueBalancer/QueueBalancerBase.cs +++ b/src/Orleans.Streaming/QueueBalancer/QueueBalancerBase.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Orleans.Internal; using Orleans.Runtime; using Orleans.Runtime.Internal; @@ -65,7 +66,7 @@ public virtual async Task Shutdown() Logger.LogError(exc, "Error signaling shutdown token."); } - await _listenForClusterChangesTask; + await _listenForClusterChangesTask.SuppressThrowing(); } ///