Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 8 additions & 20 deletions src/Orleans.Core/Utils/AsyncEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Orleans.Internal;

namespace Orleans.Runtime.Utilities
{
Expand Down Expand Up @@ -136,12 +137,12 @@ private enum PublishResult

private sealed class AsyncEnumerator : IAsyncEnumerator<T>
{
private readonly TaskCompletionSource _cancellation = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly CancellationTokenRegistration _registration;
private readonly CancellationTokenSource _cts;
private Element _current;

public AsyncEnumerator(Element initial, CancellationToken cancellation)
public AsyncEnumerator(Element initial, CancellationToken cancellationToken)
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
if (!initial.IsValid)
{
_current = initial;
Expand All @@ -152,38 +153,25 @@ public AsyncEnumerator(Element initial, CancellationToken cancellation)
result.SetNext(initial);
_current = result;
}

if (cancellation.CanBeCanceled)
{
_registration = cancellation.Register(() => _cancellation.TrySetResult());
}
}

T IAsyncEnumerator<T>.Current => _current.Value;

async ValueTask<bool> IAsyncEnumerator<T>.MoveNextAsync()
{
if (_current.IsDisposed || _cancellation.Task.IsCompleted)
{
return false;
}

var next = _current.NextAsync();
var cancellationTask = _cancellation.Task;
var result = await Task.WhenAny(cancellationTask, next);
if (ReferenceEquals(result, cancellationTask))
if (_current.IsDisposed)
{
return false;
}

_current = await next;
_current = await _current.NextAsync().WaitAsync(_cts.Token);
return _current.IsValid;
}

async ValueTask IAsyncDisposable.DisposeAsync()
{
_cancellation.TrySetResult();
await _registration.DisposeAsync();
await _cts.CancelAsync().SuppressThrowing();
_cts.Dispose();
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/Orleans.Hosting.Kubernetes/KubernetesClusterAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,10 @@ private async Task MonitorOrleansClustering()
previous = update;
}
}
catch (Exception exception) when (!(_shutdownToken.IsCancellationRequested && (exception is TaskCanceledException || exception is OperationCanceledException)))
catch (OperationCanceledException) when (_shutdownToken.IsCancellationRequested)
{
}
catch (Exception exception)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
Expand Down
4 changes: 3 additions & 1 deletion src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ async Task OnStop(CancellationToken ct)
{
this.shutdownToken.Cancel();
if (listenToClusterChangeTask != default && !ct.IsCancellationRequested)
await listenToClusterChangeTask.WaitAsync(ct);
{
await listenToClusterChangeTask.WaitAsync(ct).SuppressThrowing();
}
};
lifecycle.Subscribe(nameof(CachedGrainLocator), ServiceLifecycleStage.RuntimeGrainServices, OnStart, OnStop);
}
Expand Down
3 changes: 2 additions & 1 deletion src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ private async Task Run()
catch (OperationCanceledException) when (_shutdownCts.IsCancellationRequested)
{
// Ignore during shutdown.
break;
}
catch (Exception exception)
{
Expand Down Expand Up @@ -492,7 +493,7 @@ private async Task PublishUpdates()
{
// The target has already seen the latest version for this silo.
builder.Remove(silo);
}
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/Orleans.Runtime/MembershipService/ClusterHealthMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ private async Task ProcessMembershipUpdates()
this.observedMembershipVersion = tableSnapshot.Version;
}
}
catch (OperationCanceledException) when (shutdownCancellation.IsCancellationRequested)
{
// Ignore and continue shutting down.
}
catch (Exception exception) when (this.fatalErrorHandler.IsUnexpected(exception))
{
this.fatalErrorHandler.OnFatalException(this, nameof(ProcessMembershipUpdates), exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
this.updates.TryPublish(tableSnapshot.CreateClusterMembershipSnapshot());
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Ignore and continue shutting down.
}
catch (Exception exception) when (this.fatalErrorHandler.IsUnexpected(exception))
{
this.log.LogError(exception, "Error processing membership updates");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ private async Task ProcessMembershipUpdates()
previous = snapshot;
}
}
catch (OperationCanceledException) when (_cancellation.IsCancellationRequested)
{
// Ignore and continue shutting down.
}
catch (Exception exception) when (_fatalErrorHandler.IsUnexpected(exception))
{
_logger.LogError(exception, "Error processing membership updates.");
Expand Down
3 changes: 2 additions & 1 deletion src/Orleans.Streaming/QueueBalancer/QueueBalancerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Orleans.Internal;
using Orleans.Runtime;
using Orleans.Runtime.Internal;

Expand Down Expand Up @@ -65,7 +66,7 @@ public virtual async Task Shutdown()
Logger.LogError(exc, "Error signaling shutdown token.");
}

await _listenForClusterChangesTask;
await _listenForClusterChangesTask.SuppressThrowing();
}

/// <inheritdoc/>
Expand Down
Loading