diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs
index 82911e9ab6c..c5ba7fd21ac 100644
--- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs
+++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs
@@ -243,6 +243,17 @@ private async Task ExecutePendingOperations()
LogWarningOperationFailedRetry(logger, exception, op.Name);
await Task.Delay(RetryDelay);
+ if (!this.localDirectory.Running)
+ {
+ return;
+ }
+
+ // Keep retrying failed handoff work, but let later queued operations make progress.
+ lock (this)
+ {
+ this.pendingOperations.Dequeue();
+ this.pendingOperations.Enqueue(op);
+ }
}
}
}
diff --git a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs
index 755f355ccb4..c5e0f07f44f 100644
--- a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs
+++ b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs
@@ -95,12 +95,5 @@ internal interface ILocalGrainDirectory : IDhtGrainDirectory
/// Attempts to find the specified grain in the directory cache.
///
bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address);
-
- ///
- /// For determining message forwarding logic, we sometimes check if a silo is part of this cluster or not
- ///
- /// the address of the silo
- /// true if the silo is known to be part of this cluster
- bool IsSiloInCluster(SiloAddress silo);
}
}
diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs
index a039214acd5..fa261ae209c 100644
--- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs
+++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs
@@ -108,11 +108,11 @@ public LocalGrainDirectory(
DirectoryInstruments.RegisterDirectoryPartitionSizeObserve(() => DirectoryPartition.Count);
DirectoryInstruments.RegisterMyPortionRingDistanceObserve(() => RingDistanceToSuccessor());
- DirectoryInstruments.RegisterMyPortionRingPercentageObserve(() => (((float)this.RingDistanceToSuccessor()) / ((float)(int.MaxValue * 2L))) * 100);
+ DirectoryInstruments.RegisterMyPortionRingPercentageObserve(() => this.RingDistanceToSuccessor() / (float)(int.MaxValue * 2L) * 100);
DirectoryInstruments.RegisterMyPortionAverageRingPercentageObserve(() =>
{
var ring = this.directoryMembership.MembershipRingList;
- return ring.Count == 0 ? 0 : ((float)100 / (float)ring.Count);
+ return ring.Count == 0 ? 0 : (100 / (float)ring.Count);
});
DirectoryInstruments.RegisterRingSizeObserve(() => this.directoryMembership.MembershipRingList.Count);
_serviceProvider = serviceProvider;
@@ -173,17 +173,16 @@ public async Task StopAsync()
private async Task ProcessMembershipUpdates(CancellationToken cancellationToken)
{
- var snapshot = clusterMembershipService.CurrentSnapshot;
while (!cancellationToken.IsCancellationRequested)
{
try
{
- await ApplyMembershipSnapshot(snapshot);
+ await ApplyMembershipSnapshot();
- await foreach (var update in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken))
+ await foreach (var _ in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken))
{
- snapshot = update;
- await ApplyMembershipSnapshot(snapshot);
+ // Always apply the latest snapshot.
+ await ApplyMembershipSnapshot();
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
@@ -207,58 +206,58 @@ private async Task ProcessMembershipUpdates(CancellationToken cancellationToken)
LogWarningRefreshingClusterMembershipFailed(refreshException);
}
- snapshot = clusterMembershipService.CurrentSnapshot;
await Task.Delay(RETRY_DELAY, cancellationToken).SuppressThrowing();
}
}
}
- private Task ApplyMembershipSnapshot(ClusterMembershipSnapshot snapshot)
+ private Task ApplyMembershipSnapshot()
{
return CacheValidator.RunOrQueueTask(() =>
{
- ApplyMembershipSnapshotCore(snapshot);
+ ApplyMembershipSnapshotCore();
return Task.CompletedTask;
});
- }
- private void ApplyMembershipSnapshotCore(ClusterMembershipSnapshot snapshot)
- {
- if (!Running)
+ void ApplyMembershipSnapshotCore()
{
- return;
- }
+ if (!Running)
+ {
+ return;
+ }
- var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default;
- if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version)
- {
- return;
- }
+ var snapshot = clusterMembershipService.CurrentSnapshot;
+ var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default;
+ if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version)
+ {
+ return;
+ }
- var previousMembership = CreateDirectoryMembership(previousSnapshot);
- var targetMembership = CreateDirectoryMembership(snapshot);
- this.directoryMembership = targetMembership;
+ var previousMembership = CreateDirectoryMembership(previousSnapshot);
+ var targetMembership = CreateDirectoryMembership(snapshot);
+ this.directoryMembership = targetMembership;
- var removedSilos = GetMembershipDifference(previousMembership, targetMembership);
- var addedSilos = GetMembershipDifference(targetMembership, previousMembership);
+ var removedSilos = GetMembershipDifference(previousMembership, targetMembership);
+ var addedSilos = GetMembershipDifference(targetMembership, previousMembership);
- ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership);
- AdjustLocalDirectory(snapshot);
- AdjustLocalCache(snapshot, targetMembership);
+ ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership);
+ AdjustLocalDirectory(snapshot);
+ AdjustLocalCache(snapshot, targetMembership);
- foreach (var silo in removedSilos)
- {
- LogDebugSiloRemovedSilo(MyAddress, silo);
- }
+ foreach (var silo in removedSilos)
+ {
+ LogDebugSiloRemovedSilo(MyAddress, silo);
+ }
- foreach (var silo in addedSilos)
- {
- HandoffManager.ProcessSiloAddEvent(silo);
- LogDebugSiloAddedSilo(MyAddress, silo);
- }
+ foreach (var silo in addedSilos)
+ {
+ HandoffManager.ProcessSiloAddEvent(silo);
+ LogDebugSiloAddedSilo(MyAddress, silo);
+ }
- appliedClusterMembershipSnapshot = snapshot;
- hasAppliedClusterMembershipSnapshot = true;
+ appliedClusterMembershipSnapshot = snapshot;
+ hasAppliedClusterMembershipSnapshot = true;
+ }
}
private void ProcessSiloStatusChanges(
@@ -352,14 +351,12 @@ private async Task RefreshMembershipIfNewer(MembershipVersion targetVersion)
return;
}
- var snapshot = clusterMembershipService.CurrentSnapshot;
- if (targetVersion > snapshot.Version)
+ if (targetVersion > clusterMembershipService.CurrentSnapshot.Version)
{
await clusterMembershipService.Refresh(targetVersion);
- snapshot = clusterMembershipService.CurrentSnapshot;
}
- await ApplyMembershipSnapshot(snapshot);
+ await ApplyMembershipSnapshot();
}
private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddress updatedSilo, SiloStatus status)
@@ -406,9 +403,9 @@ private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddr
return;
}
- LogInfoSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo));
+ LogInfoSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo), status);
- var reasonText = $"This activation is being deactivated due to a failure of server {updatedSilo}, since it was responsible for this activation's grain directory registration.";
+ var reasonText = $"This activation is being deactivated because server {updatedSilo} entered status {status} and was responsible for this activation's grain directory registration.";
var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText);
foreach (var activation in activationsToShutdown)
{
@@ -472,9 +469,14 @@ private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipS
if (snapshot.Members.TryGetValue(silo, out var member))
{
+ // If this is a known host, remove the activation if the host is dead.
return member.Status == SiloStatus.Dead;
}
+ // If this is not a known host, remove the activation if it was registered at an older membership version.
+ // This indicates that the host must have been removed.
+ // Hosts cannot activate grains before they are active, and we ensure that we refresh the membership before processing messages,
+ // so this is a reliable indicator of a defunct activation.
return address.MembershipVersion < snapshot.Version;
}
@@ -763,7 +765,6 @@ private void UnregisterOrPutInForwardList(List addresses, Unregist
}
}
-
public async Task UnregisterManyAsync(List addresses, UnregistrationCause cause, int hopCount)
{
if (hopCount > 0)
@@ -1021,11 +1022,6 @@ private static int CompareSiloAddress(SiloAddress left, SiloAddress right)
return hashComparison != 0 ? hashComparison : left.CompareTo(right);
}
- public bool IsSiloInCluster(SiloAddress silo)
- {
- return this.directoryMembership.MembershipCache.Contains(silo);
- }
-
public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) => this.DirectoryCache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0);
public bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address) => (address = GetLocalCacheData(grainId)) is not null;
void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle)
@@ -1070,13 +1066,13 @@ private readonly struct SiloHashLogValue(SiloAddress? silo)
[LoggerMessage(
Level = LogLevel.Information,
EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification,
- Message = "LocalGrainDirectory is deactivating {Count} activations due to a failure of silo {Silo}, since it is a primary directory partition to these grain ids."
+ Message = "LocalGrainDirectory is deactivating {Count} activations because silo {Silo} entered status {Status} and was the primary directory partition for these grain ids."
)]
- private partial void LogInfoSiloStatusChangeNotification(int count, SiloAddressLogValue silo);
+ private partial void LogInfoSiloStatusChangeNotification(int count, SiloAddressLogValue silo, SiloStatus status);
[LoggerMessage(
Level = LogLevel.Error,
- EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception,
+ EventId = (int)ErrorCode.Catalog_DeactivateActivation_Exception,
Message = "LocalGrainDirectory has thrown an exception while deactivating activation {GrainId} due to removal of silo {Silo}."
)]
private partial void LogErrorDeactivatingActivationForRemovedSilo(Exception exception, GrainId grainId, SiloAddressLogValue silo);
@@ -1215,27 +1211,21 @@ private readonly struct SiloHashLogValue(SiloAddress? silo)
)]
private partial void LogWarningDeleteGrainAsyncNotOwner(GrainId grainId, SiloAddress? forwardAddress, int hopCount);
- private class DirectoryMembership
+ private class DirectoryMembership(ImmutableList membershipRingList, ImmutableHashSet membershipCache)
{
- public DirectoryMembership(ImmutableList membershipRingList, ImmutableHashSet membershipCache)
- {
- this.MembershipRingList = membershipRingList;
- this.MembershipCache = membershipCache;
- }
-
- public static DirectoryMembership Default { get; } = new DirectoryMembership(ImmutableList.Empty, ImmutableHashSet.Empty);
+ public static DirectoryMembership Default { get; } = new DirectoryMembership([], []);
public static DirectoryMembership Create(IEnumerable members)
{
var builder = ImmutableList.CreateBuilder();
builder.AddRange(members);
- builder.Sort(LocalGrainDirectory.CompareSiloAddress);
+ builder.Sort(CompareSiloAddress);
var ring = builder.ToImmutable();
- return new DirectoryMembership(ring, ring.ToImmutableHashSet());
+ return new DirectoryMembership(ring, [.. ring]);
}
- public ImmutableList MembershipRingList { get; }
- public ImmutableHashSet MembershipCache { get; }
+ public ImmutableList MembershipRingList { get; } = membershipRingList;
+ public ImmutableHashSet MembershipCache { get; } = membershipCache;
}
}
}