From a96b43048dcb356985a8b1700ddab5a72582c3fc Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 17:32:50 -0700 Subject: [PATCH 1/4] Simplify LocalGrainDirectory membership updates Apply the latest cluster membership snapshot directly, remove the unused IsSiloInCluster contract, and simplify related LocalGrainDirectory expressions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/ILocalGrainDirectory.cs | 7 -- .../GrainDirectory/LocalGrainDirectory.cs | 109 ++++++++---------- 2 files changed, 47 insertions(+), 69 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs index 755f355ccb4..c5e0f07f44f 100644 --- a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs @@ -95,12 +95,5 @@ internal interface ILocalGrainDirectory : IDhtGrainDirectory /// Attempts to find the specified grain in the directory cache. /// bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address); - - /// - /// For determining message forwarding logic, we sometimes check if a silo is part of this cluster or not - /// - /// the address of the silo - /// true if the silo is known to be part of this cluster - bool IsSiloInCluster(SiloAddress silo); } } diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index a039214acd5..4b4a192e8b1 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -108,11 +108,11 @@ public LocalGrainDirectory( DirectoryInstruments.RegisterDirectoryPartitionSizeObserve(() => DirectoryPartition.Count); DirectoryInstruments.RegisterMyPortionRingDistanceObserve(() => RingDistanceToSuccessor()); - DirectoryInstruments.RegisterMyPortionRingPercentageObserve(() => (((float)this.RingDistanceToSuccessor()) / ((float)(int.MaxValue * 2L))) * 100); + DirectoryInstruments.RegisterMyPortionRingPercentageObserve(() => this.RingDistanceToSuccessor() / (float)(int.MaxValue * 2L) * 100); DirectoryInstruments.RegisterMyPortionAverageRingPercentageObserve(() => { var ring = this.directoryMembership.MembershipRingList; - return ring.Count == 0 ? 0 : ((float)100 / (float)ring.Count); + return ring.Count == 0 ? 0 : (100 / (float)ring.Count); }); DirectoryInstruments.RegisterRingSizeObserve(() => this.directoryMembership.MembershipRingList.Count); _serviceProvider = serviceProvider; @@ -173,17 +173,16 @@ public async Task StopAsync() private async Task ProcessMembershipUpdates(CancellationToken cancellationToken) { - var snapshot = clusterMembershipService.CurrentSnapshot; while (!cancellationToken.IsCancellationRequested) { try { - await ApplyMembershipSnapshot(snapshot); + await ApplyMembershipSnapshot(); - await foreach (var update in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken)) + await foreach (var _ in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken)) { - snapshot = update; - await ApplyMembershipSnapshot(snapshot); + // Always apply the latest snapshot. + await ApplyMembershipSnapshot(); } } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) @@ -207,58 +206,58 @@ private async Task ProcessMembershipUpdates(CancellationToken cancellationToken) LogWarningRefreshingClusterMembershipFailed(refreshException); } - snapshot = clusterMembershipService.CurrentSnapshot; await Task.Delay(RETRY_DELAY, cancellationToken).SuppressThrowing(); } } } - private Task ApplyMembershipSnapshot(ClusterMembershipSnapshot snapshot) + private Task ApplyMembershipSnapshot() { return CacheValidator.RunOrQueueTask(() => { - ApplyMembershipSnapshotCore(snapshot); + ApplyMembershipSnapshotCore(); return Task.CompletedTask; }); - } - private void ApplyMembershipSnapshotCore(ClusterMembershipSnapshot snapshot) - { - if (!Running) + void ApplyMembershipSnapshotCore() { - return; - } + if (!Running) + { + return; + } - var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default; - if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version) - { - return; - } + var snapshot = clusterMembershipService.CurrentSnapshot; + var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default; + if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version) + { + return; + } - var previousMembership = CreateDirectoryMembership(previousSnapshot); - var targetMembership = CreateDirectoryMembership(snapshot); - this.directoryMembership = targetMembership; + var previousMembership = CreateDirectoryMembership(previousSnapshot); + var targetMembership = CreateDirectoryMembership(snapshot); + this.directoryMembership = targetMembership; - var removedSilos = GetMembershipDifference(previousMembership, targetMembership); - var addedSilos = GetMembershipDifference(targetMembership, previousMembership); + var removedSilos = GetMembershipDifference(previousMembership, targetMembership); + var addedSilos = GetMembershipDifference(targetMembership, previousMembership); - ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership); - AdjustLocalDirectory(snapshot); - AdjustLocalCache(snapshot, targetMembership); + ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership); + AdjustLocalDirectory(snapshot); + AdjustLocalCache(snapshot, targetMembership); - foreach (var silo in removedSilos) - { - LogDebugSiloRemovedSilo(MyAddress, silo); - } + foreach (var silo in removedSilos) + { + LogDebugSiloRemovedSilo(MyAddress, silo); + } - foreach (var silo in addedSilos) - { - HandoffManager.ProcessSiloAddEvent(silo); - LogDebugSiloAddedSilo(MyAddress, silo); - } + foreach (var silo in addedSilos) + { + HandoffManager.ProcessSiloAddEvent(silo); + LogDebugSiloAddedSilo(MyAddress, silo); + } - appliedClusterMembershipSnapshot = snapshot; - hasAppliedClusterMembershipSnapshot = true; + appliedClusterMembershipSnapshot = snapshot; + hasAppliedClusterMembershipSnapshot = true; + } } private void ProcessSiloStatusChanges( @@ -352,14 +351,12 @@ private async Task RefreshMembershipIfNewer(MembershipVersion targetVersion) return; } - var snapshot = clusterMembershipService.CurrentSnapshot; - if (targetVersion > snapshot.Version) + if (targetVersion > clusterMembershipService.CurrentSnapshot.Version) { await clusterMembershipService.Refresh(targetVersion); - snapshot = clusterMembershipService.CurrentSnapshot; } - await ApplyMembershipSnapshot(snapshot); + await ApplyMembershipSnapshot(); } private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddress updatedSilo, SiloStatus status) @@ -763,7 +760,6 @@ private void UnregisterOrPutInForwardList(List addresses, Unregist } } - public async Task UnregisterManyAsync(List addresses, UnregistrationCause cause, int hopCount) { if (hopCount > 0) @@ -1021,11 +1017,6 @@ private static int CompareSiloAddress(SiloAddress left, SiloAddress right) return hashComparison != 0 ? hashComparison : left.CompareTo(right); } - public bool IsSiloInCluster(SiloAddress silo) - { - return this.directoryMembership.MembershipCache.Contains(silo); - } - public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) => this.DirectoryCache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); public bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address) => (address = GetLocalCacheData(grainId)) is not null; void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) @@ -1215,27 +1206,21 @@ private readonly struct SiloHashLogValue(SiloAddress? silo) )] private partial void LogWarningDeleteGrainAsyncNotOwner(GrainId grainId, SiloAddress? forwardAddress, int hopCount); - private class DirectoryMembership + private class DirectoryMembership(ImmutableList membershipRingList, ImmutableHashSet membershipCache) { - public DirectoryMembership(ImmutableList membershipRingList, ImmutableHashSet membershipCache) - { - this.MembershipRingList = membershipRingList; - this.MembershipCache = membershipCache; - } - - public static DirectoryMembership Default { get; } = new DirectoryMembership(ImmutableList.Empty, ImmutableHashSet.Empty); + public static DirectoryMembership Default { get; } = new DirectoryMembership([], []); public static DirectoryMembership Create(IEnumerable members) { var builder = ImmutableList.CreateBuilder(); builder.AddRange(members); - builder.Sort(LocalGrainDirectory.CompareSiloAddress); + builder.Sort(CompareSiloAddress); var ring = builder.ToImmutable(); - return new DirectoryMembership(ring, ring.ToImmutableHashSet()); + return new DirectoryMembership(ring, [.. ring]); } - public ImmutableList MembershipRingList { get; } - public ImmutableHashSet MembershipCache { get; } + public ImmutableList MembershipRingList { get; } = membershipRingList; + public ImmutableHashSet MembershipCache { get; } = membershipCache; } } } From c43d4ba0e1acf546aa1b872a4954de477303b3e9 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 17:34:39 -0700 Subject: [PATCH 2/4] Document defunct activation cleanup Clarify why LocalGrainDirectory removes dead-silo activations and stale unknown-silo activations during membership reconciliation. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 4b4a192e8b1..342d7816dc4 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -469,9 +469,14 @@ private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipS if (snapshot.Members.TryGetValue(silo, out var member)) { + // If this is a known host, remove the activation if the host is dead. return member.Status == SiloStatus.Dead; } + // If this is not a known host, remove the activation if it was registered at an older membership version. + // This indicates that the host must have been removed. + // Hosts cannot activate grains before they are active, and we ensure that we refresh the membership before processing messages, + // so this is a reliable indicator of a defunct activation. return address.MembershipVersion < snapshot.Version; } From 141b6c766fcbb37433c610adc5a0970c7c0836e5 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 17:56:00 -0700 Subject: [PATCH 3/4] Address PR review feedback Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/LocalGrainDirectory.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 342d7816dc4..fa261ae209c 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -403,9 +403,9 @@ private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddr return; } - LogInfoSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo)); + LogInfoSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo), status); - var reasonText = $"This activation is being deactivated due to a failure of server {updatedSilo}, since it was responsible for this activation's grain directory registration."; + var reasonText = $"This activation is being deactivated because server {updatedSilo} entered status {status} and was responsible for this activation's grain directory registration."; var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText); foreach (var activation in activationsToShutdown) { @@ -1066,13 +1066,13 @@ private readonly struct SiloHashLogValue(SiloAddress? silo) [LoggerMessage( Level = LogLevel.Information, EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification, - Message = "LocalGrainDirectory is deactivating {Count} activations due to a failure of silo {Silo}, since it is a primary directory partition to these grain ids." + Message = "LocalGrainDirectory is deactivating {Count} activations because silo {Silo} entered status {Status} and was the primary directory partition for these grain ids." )] - private partial void LogInfoSiloStatusChangeNotification(int count, SiloAddressLogValue silo); + private partial void LogInfoSiloStatusChangeNotification(int count, SiloAddressLogValue silo, SiloStatus status); [LoggerMessage( Level = LogLevel.Error, - EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception, + EventId = (int)ErrorCode.Catalog_DeactivateActivation_Exception, Message = "LocalGrainDirectory has thrown an exception while deactivating activation {GrainId} due to removal of silo {Silo}." )] private partial void LogErrorDeactivatingActivationForRemovedSilo(Exception exception, GrainId grainId, SiloAddressLogValue silo); From 834b311c0ba4cb17925245abdebe22089e454958 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Tue, 12 May 2026 09:38:14 -0700 Subject: [PATCH 4/4] Avoid starving handoff operations on retry Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/GrainDirectoryHandoffManager.cs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs index 82911e9ab6c..c5ba7fd21ac 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs @@ -243,6 +243,17 @@ private async Task ExecutePendingOperations() LogWarningOperationFailedRetry(logger, exception, op.Name); await Task.Delay(RetryDelay); + if (!this.localDirectory.Running) + { + return; + } + + // Keep retrying failed handoff work, but let later queued operations make progress. + lock (this) + { + this.pendingOperations.Dequeue(); + this.pendingOperations.Enqueue(op); + } } } }