Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions src/Orleans.ScheduledJobs/LocalScheduledJobManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ private async Task Stop(CancellationToken ct)

if (_listenForClusterChangesTask is not null)
{
await _listenForClusterChangesTask;
await _listenForClusterChangesTask.SuppressThrowing();
}

if (_periodicCheckTask is not null)
{
await _periodicCheckTask;
await _periodicCheckTask.SuppressThrowing();
}

await Task.WhenAll(_runningShards.Values.ToArray());
Expand Down Expand Up @@ -180,26 +180,36 @@ private async Task ProcessMembershipUpdates()
{
await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding | ConfigureAwaitOptions.ContinueOnCapturedContext);
var current = new HashSet<SiloAddress>();
await foreach (var membershipSnapshot in _clusterMembershipUpdates.WithCancellation(_cts.Token))

try
{
try
await foreach (var membershipSnapshot in _clusterMembershipUpdates.WithCancellation(_cts.Token))
{
// Get active members
var update = new HashSet<SiloAddress>(membershipSnapshot.Members.Values
.Where(member => member.Status == SiloStatus.Active)
.Select(member => member.SiloAddress));
try
{
// Get active members
var update = new HashSet<SiloAddress>(membershipSnapshot.Members.Values
.Where(member => member.Status == SiloStatus.Active)
.Select(member => member.SiloAddress));

// If active list has changed, trigger immediate shard check
if (!current.SetEquals(update))
// If active list has changed, trigger immediate shard check
if (!current.SetEquals(update))
{
current = update;
_shardCheckSignal.Release();
}
}
catch (Exception exception)
{
current = update;
_shardCheckSignal.Release();
LogErrorProcessingClusterMembership(_logger, exception);
}
}
catch (Exception exception)
}
catch (OperationCanceledException)
{
if (!_cts.Token.IsCancellationRequested)
{
LogErrorProcessingClusterMembership(_logger, exception);
throw;
}
}
}
Expand All @@ -210,14 +220,19 @@ private async Task PeriodicShardCheck()

using var timer = new PeriodicTimer(TimeSpan.FromMinutes(10));

Task timerTask = Task.CompletedTask;
while (!_cts.Token.IsCancellationRequested)
{
try
{
// Wait for either periodic timer OR signal from membership changes
var timerTask = timer.WaitForNextTickAsync(_cts.Token);
if (timerTask.IsCompleted)
{
timerTask = timer.WaitForNextTickAsync(_cts.Token).AsTask();
}

var signalTask = _shardCheckSignal.WaitAsync(_cts.Token);
await Task.WhenAny(timerTask.AsTask(), signalTask);
await Task.WhenAny(timerTask, signalTask);

LogCheckingPendingShards(_logger);

Expand Down Expand Up @@ -259,6 +274,7 @@ private async Task PeriodicShardCheck()
catch (Exception ex)
{
LogErrorInPeriodicCheck(_logger, ex);
await Task.Delay(TimeSpan.FromSeconds(5), _cts.Token).SuppressThrowing();
}
}
}
Expand Down
Loading