Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private async Task AcceptExistingRegistrationsAsync(List<GrainAddress> singleAct

for (var i = singleActivations.Count - 1; i >= 0; i--)
{
if (singleActivations[i].SiloAddress is not { } siloAddress || this.siloStatusOracle.GetApproximateSiloStatus(siloAddress).IsTerminating())
if (singleActivations[i].SiloAddress is not { } siloAddress || this.siloStatusOracle.GetApproximateSiloStatus(siloAddress) == SiloStatus.Dead)
{
singleActivations.RemoveAt(i);
}
Expand Down
54 changes: 43 additions & 11 deletions src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Orleans.Configuration;
using Orleans.GrainDirectory;
using Orleans.Internal;
using Orleans.Runtime.Internal;
using Orleans.Runtime.Scheduler;

namespace Orleans.Runtime.GrainDirectory
Expand Down Expand Up @@ -122,6 +123,7 @@ public void Start()
LogDebugStart();

Running = true;
Comment thread
ReubenBond marked this conversation as resolved.
using var _ = new ExecutionContextSuppressor();
membershipUpdatesTask = Task.Run(() => ProcessMembershipUpdates(_membershipUpdatesCancellation.Token));
}

Expand All @@ -141,14 +143,28 @@ public async Task StopAsync()
//mark Running as false will exclude myself from CalculateGrainDirectoryPartition(grainId)
Running = false;
_membershipUpdatesCancellation.Cancel();
if (membershipUpdatesTask is { } task)
try
{
await task.SuppressThrowing();
if (membershipUpdatesTask is { } task)
{
await task.SuppressThrowing();
}
}
finally
{
_membershipUpdatesCancellation.Dispose();
}

if (this.disposeDirectoryCache)
{
await GrainDirectoryCacheFactory.DisposeGrainDirectoryCacheAsync(DirectoryCache).SuppressThrowing();
try
{
await GrainDirectoryCacheFactory.DisposeGrainDirectoryCacheAsync(DirectoryCache);
}
catch (Exception exception)
{
LogWarningDisposeDirectoryCacheFailed(exception);
}
}

DirectoryPartition.Clear();
Expand Down Expand Up @@ -227,8 +243,8 @@ private void ApplyMembershipSnapshotCore(ClusterMembershipSnapshot snapshot)
var addedSilos = GetMembershipDifference(targetMembership, previousMembership);

ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership);
AdjustLocalDirectory(targetMembership);
AdjustLocalCache(targetMembership);
AdjustLocalDirectory(snapshot);
AdjustLocalCache(snapshot, targetMembership);

foreach (var silo in removedSilos)
{
Expand Down Expand Up @@ -365,14 +381,14 @@ private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddr
}
}

private void AdjustLocalDirectory(DirectoryMembership targetMembership)
private void AdjustLocalDirectory(ClusterMembershipSnapshot snapshot)
{
var activationsToRemove = new List<(GrainId, ActivationId)>();
foreach (var entry in this.DirectoryPartition.GetItems())
{
if (entry.Value.Activation is { } address)
{
if (IsDefunctActivationSilo(targetMembership, address.SiloAddress))
if (IsDefunctActivation(address, snapshot))
{
activationsToRemove.Add((entry.Key, address.ActivationId));
}
Expand All @@ -385,7 +401,7 @@ private void AdjustLocalDirectory(DirectoryMembership targetMembership)
}
}

private void AdjustLocalCache(DirectoryMembership targetMembership)
private void AdjustLocalCache(ClusterMembershipSnapshot snapshot, DirectoryMembership targetMembership)
{
foreach (var tuple in DirectoryCache.KeyValues)
{
Expand All @@ -398,16 +414,26 @@ private void AdjustLocalCache(DirectoryMembership targetMembership)
continue;
}

if (IsDefunctActivationSilo(targetMembership, activationAddress.SiloAddress))
if (IsDefunctActivation(activationAddress, snapshot))
{
DirectoryCache.Remove(activationAddress.GrainId);
}
}
}

private static bool IsDefunctActivationSilo(DirectoryMembership targetMembership, SiloAddress? silo)
private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot)
{
return silo is null || !targetMembership.MembershipCache.Contains(silo);
if (address.SiloAddress is not { } silo)
{
return true;
}

if (snapshot.Members.TryGetValue(silo, out var member))
{
return member.Status == SiloStatus.Dead;
}

return address.MembershipVersion < snapshot.Version;
}

internal SiloAddress? FindPredecessor(SiloAddress silo)
Expand Down Expand Up @@ -1017,6 +1043,12 @@ private readonly struct SiloHashLogValue(SiloAddress? silo)
)]
private partial void LogWarningRefreshingClusterMembershipFailed(Exception exception);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Failed to dispose the grain directory cache."
)]
private partial void LogWarningDisposeDirectoryCacheFailed(Exception exception);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Silo {LocalSilo} removed silo {OtherSilo}"
Expand Down
Loading