From 4f3a189f2edc7c213860a2232c6dc40748d320ee Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 15:20:18 -0700 Subject: [PATCH 01/15] Fix LocalGrainDirectory membership reconciliation Process cluster membership as snapshots in LocalGrainDirectory so directory state can be reconciled and retried after failures. Move silo-removal activation cleanup out of Catalog and keep handoff operations retrying until success, obsolescence, or shutdown. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/Orleans.Runtime/Catalog/Catalog.cs | 93 ------ .../GrainDirectoryHandoffManager.cs | 54 ++- .../GrainDirectory/LocalGrainDirectory.cs | 313 ++++++++++++++---- .../Directory/CachedGrainLocatorTests.cs | 2 + 4 files changed, 275 insertions(+), 187 deletions(-) diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs index 0eff3079ee0..dfca80b796d 100644 --- a/src/Orleans.Runtime/Catalog/Catalog.cs +++ b/src/Orleans.Runtime/Catalog/Catalog.cs @@ -11,9 +11,7 @@ namespace Orleans.Runtime { internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecycleParticipant { - private readonly SiloAddress _siloAddress; private readonly ActivationCollector activationCollector; - private readonly GrainDirectoryResolver grainDirectoryResolver; private readonly ActivationDirectory activations; private readonly IServiceProvider serviceProvider; private readonly ILogger logger; @@ -30,8 +28,6 @@ internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecyclePartic #endif public Catalog( - ILocalSiloDetails localSiloDetails, - GrainDirectoryResolver grainDirectoryResolver, ActivationDirectory activationDirectory, ActivationCollector activationCollector, IServiceProvider serviceProvider, @@ -40,8 +36,6 @@ public Catalog( SystemTargetShared shared) : base(Constants.CatalogType, shared) { - this._siloAddress = localSiloDetails.SiloAddress; - this.grainDirectoryResolver = grainDirectoryResolver; this.activations = activationDirectory; this.serviceProvider = serviceProvider; this.grainActivator = grainActivator; @@ -328,99 +322,12 @@ await Parallel.ForEachAsync(addresses, (activationAddress, cancellationToken) => }); } - // TODO move this logic in the LocalGrainDirectory - internal void OnSiloStatusChange(ILocalGrainDirectory directory, SiloAddress updatedSilo, SiloStatus status) - { - // ignore joining events and also events on myself. - if (updatedSilo.Equals(_siloAddress)) return; - - // We deactivate those activations when silo goes either of ShuttingDown/Stopping/Dead states, - // since this is what Directory is doing as well. Directory removes a silo based on all those 3 statuses, - // thus it will only deliver a "remove" notification for a given silo once to us. Therefore, we need to react the fist time we are notified. - // We may review the directory behavior in the future and treat ShuttingDown differently ("drain only") and then this code will have to change a well. - if (!status.IsTerminating()) return; - if (status == SiloStatus.Dead) - { - this.RuntimeClient.BreakOutstandingMessagesToSilo(updatedSilo); - } - - var activationsToShutdown = new List(); - try - { - // scan all activations in activation directory and deactivate the ones that the removed silo is their primary partition owner. - // Note: No lock needed here since ActivationDirectory uses ConcurrentDictionary which provides thread-safe enumeration - foreach (var activation in activations) - { - try - { - var activationData = activation.Value; - var placementStrategy = activationData.GetComponent(); - var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true }; - if (!isUsingGrainDirectory || !grainDirectoryResolver.IsUsingDefaultDirectory(activationData.GrainId.Type)) continue; - if (!updatedSilo.Equals(directory.GetPrimaryForGrain(activationData.GrainId))) continue; - - activationsToShutdown.Add(activationData); - } - catch (Exception exc) - { - LogErrorCatalogSiloStatusChangeNotification(new(updatedSilo), exc); - } - } - - if (activationsToShutdown.Count > 0) - { - LogInfoCatalogSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo)); - } - } - finally - { - // outside the lock. - if (activationsToShutdown.Count > 0) - { - var reasonText = $"This activation is being deactivated due to a failure of server {updatedSilo}, since it was responsible for this activation's grain directory registration."; - var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText); - StartDeactivatingActivations(reason, activationsToShutdown, CancellationToken.None); - } - } - - void StartDeactivatingActivations(DeactivationReason reason, List list, CancellationToken cancellationToken) - { - if (list == null || list.Count == 0) return; - - LogDebugDeactivateActivations(list.Count); - - foreach (var activation in list) - { - activation.Deactivate(reason, cancellationToken); - } - } - } - void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) { // Do nothing, just ensure that this instance is created so that it can register itself in the activation directory. _siloStatusOracle = serviceProvider.GetRequiredService(); } - private readonly struct SiloAddressLogValue(SiloAddress silo) - { - public override string ToString() => silo.ToStringWithHashCode(); - } - - [LoggerMessage( - Level = LogLevel.Error, - EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception, - Message = "Catalog has thrown an exception while handling removal of silo {Silo}" - )] - private partial void LogErrorCatalogSiloStatusChangeNotification(SiloAddressLogValue silo, Exception exception); - - [LoggerMessage( - Level = LogLevel.Information, - EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification, - Message = "Catalog is deactivating {Count} activations due to a failure of silo {Silo}, since it is a primary directory partition to these grain ids." - )] - private partial void LogInfoCatalogSiloStatusChangeNotification(int count, SiloAddressLogValue silo); - [LoggerMessage( Level = LogLevel.Trace, Message = "Unregistered activation {Activation}")] diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs index 2f8d58c5e41..40804db0e40 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs @@ -15,7 +15,6 @@ namespace Orleans.Runtime.GrainDirectory internal sealed partial class GrainDirectoryHandoffManager { private static readonly TimeSpan RetryDelay = TimeSpan.FromMilliseconds(250); - private const int MAX_OPERATION_DEQUEUE = 2; private readonly LocalGrainDirectory localDirectory; private readonly ISiloStatusOracle siloStatusOracle; private readonly IInternalGrainFactory grainFactory; @@ -71,6 +70,11 @@ internal void ProcessSiloAddEvent(SiloAddress addedSilo) private async Task ProcessAddedSiloAsync(SiloAddress addedSilo, List splitPartListSingle) { if (!this.localDirectory.Running) return; + if (!addedSilo.Equals(localDirectory.FindSuccessor(localDirectory.MyAddress))) + { + LogDebugNotImmediateSuccessor(logger, addedSilo); + return; + } if (this.siloStatusOracle.GetApproximateSiloStatus(addedSilo) == SiloStatus.Active) { @@ -124,6 +128,16 @@ private async Task AcceptExistingRegistrationsAsync(List singleAct { if (!this.localDirectory.Running) return; + for (var i = singleActivations.Count - 1; i >= 0; i--) + { + if (singleActivations[i].SiloAddress is not { } siloAddress || this.siloStatusOracle.GetApproximateSiloStatus(siloAddress).IsTerminating()) + { + singleActivations.RemoveAt(i); + } + } + + if (singleActivations.Count == 0) return; + LogDebugAcceptingRegistrations(logger, singleActivations.Count); var tasks = singleActivations.Select(addr => this.localDirectory.RegisterAsync(addr, previousAddress: null, 1)).ToArray(); @@ -190,10 +204,7 @@ private void EnqueueOperation(string name, object state, Func= MAX_OPERATION_DEQUEUE) + catch (Exception exception) { - lock (this) + if (!this.localDirectory.Running) { - // Remove the operation from the queue if it was a success - // or if we tried too many times - this.pendingOperations.Dequeue(); + return; } + + LogWarningOperationFailedRetry(logger, exception, op.Name); + await Task.Delay(RetryDelay); } } } @@ -335,10 +334,5 @@ private readonly struct GrainAddressesLogValue(List grainAddresses )] private static partial void LogWarningOperationFailedRetry(ILogger logger, Exception exception, string operation); - [LoggerMessage( - Level = LogLevel.Warning, - Message = "{Operation} failed, will NOT be retried" - )] - private static partial void LogWarningOperationFailedNoRetry(ILogger logger, Exception exception, string operation); } } diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 7c97e15c82b..4ca6a201520 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -14,25 +14,32 @@ namespace Orleans.Runtime.GrainDirectory { - internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ISiloStatusListener, ILifecycleParticipant + internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ILifecycleParticipant { private readonly ILogger log; private readonly SiloAddress? seed; private readonly ISiloStatusOracle siloStatusOracle; + private readonly IClusterMembershipService clusterMembershipService; private readonly IInternalGrainFactory grainFactory; + private readonly ActivationDirectory localActivations; + private readonly InsideRuntimeClient runtimeClient; #if NET9_0_OR_GREATER private readonly Lock writeLock = new(); #else private readonly object writeLock = new(); #endif private readonly IServiceProvider _serviceProvider; + private readonly CancellationTokenSource _membershipUpdatesCancellation = new(); private DirectoryMembership directoryMembership = DirectoryMembership.Default; + private ClusterMembershipSnapshot appliedClusterMembershipSnapshot = ClusterMembershipSnapshot.Default; + private GrainDirectoryResolver? grainDirectoryResolver; + private bool hasAppliedClusterMembershipSnapshot; // Consider: move these constants into an appropriate place internal const int HOP_LIMIT = 6; // forward a remote request no more than 5 times public static readonly TimeSpan RETRY_DELAY = TimeSpan.FromMilliseconds(200); // Pause 200ms between forwards to let the membership directory settle down internal bool Running; - private Catalog? _catalog; + private Task? membershipUpdatesTask; internal SiloAddress MyAddress { get; } @@ -51,6 +58,7 @@ public LocalGrainDirectory( IServiceProvider serviceProvider, ILocalSiloDetails siloDetails, ISiloStatusOracle siloStatusOracle, + IClusterMembershipService clusterMembershipService, IInternalGrainFactory grainFactory, Factory grainDirectoryPartitionFactory, IOptions developmentClusterMembershipOptions, @@ -63,7 +71,10 @@ public LocalGrainDirectory( MyAddress = siloDetails.SiloAddress; this.siloStatusOracle = siloStatusOracle; + this.clusterMembershipService = clusterMembershipService; this.grainFactory = grainFactory; + this.localActivations = systemTargetShared.ActivationDirectory; + this.runtimeClient = systemTargetShared.RuntimeClient; DirectoryCache = GrainDirectoryCacheFactory.CreateGrainDirectoryCache(serviceProvider, grainDirectoryOptions.Value, out this.disposeDirectoryCache); @@ -96,8 +107,7 @@ public LocalGrainDirectory( DistributedGrainDirectoryPartitionCompatibilities = compatibilityPartitions.MoveToImmutable(); } - // add myself to the list of members - AddServer(MyAddress); + this.directoryMembership = DirectoryMembership.Create([MyAddress]); DirectoryInstruments.RegisterDirectoryPartitionSizeObserve(() => DirectoryPartition.Count); DirectoryInstruments.RegisterMyPortionRingDistanceObserve(() => RingDistanceToSuccessor()); @@ -116,8 +126,7 @@ public void Start() LogDebugStart(); Running = true; - - siloStatusOracle.SubscribeToSiloStatusEvents(this); + membershipUpdatesTask = Task.Run(() => ProcessMembershipUpdates(_membershipUpdatesCancellation.Token)); } // Note that this implementation stops processing directory change requests (Register, Unregister, etc.) when the Stop event is raised. @@ -135,6 +144,11 @@ public async Task StopAsync() //mark Running as false will exclude myself from CalculateGrainDirectoryPartition(grainId) Running = false; + _membershipUpdatesCancellation.Cancel(); + if (membershipUpdatesTask is { } task) + { + await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + } if (this.disposeDirectoryCache) { @@ -145,68 +159,214 @@ public async Task StopAsync() DirectoryCache.Clear(); } - private void AddServer(SiloAddress silo) + private async Task ProcessMembershipUpdates(CancellationToken cancellationToken) + { + var snapshot = clusterMembershipService.CurrentSnapshot; + while (!cancellationToken.IsCancellationRequested) + { + try + { + await ApplyMembershipSnapshot(snapshot, cancellationToken); + + await foreach (var update in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken)) + { + snapshot = update; + await ApplyMembershipSnapshot(snapshot, cancellationToken); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (Exception exception) + { + LogErrorProcessingMembershipUpdates(exception); + + try + { + await clusterMembershipService.Refresh(cancellationToken: cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (Exception refreshException) + { + LogWarningRefreshingClusterMembershipFailed(refreshException); + } + + snapshot = clusterMembershipService.CurrentSnapshot; + await Task.Delay(RETRY_DELAY, cancellationToken); + } + } + } + + private Task ApplyMembershipSnapshot(ClusterMembershipSnapshot snapshot, CancellationToken cancellationToken) + { + return CacheValidator.RunOrQueueTask(() => + { + cancellationToken.ThrowIfCancellationRequested(); + ApplyMembershipSnapshot(snapshot); + return Task.CompletedTask; + }); + } + + private void ApplyMembershipSnapshot(ClusterMembershipSnapshot snapshot) { + if (!Running) + { + return; + } + lock (this.writeLock) { - var existing = this.directoryMembership; - if (existing.MembershipCache.Contains(silo)) + var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default; + if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version) { - // we have already cached this silo return; } - // insert new silo in the sorted order - long hash = silo.GetConsistentHashCode(); + var previousMembership = CreateDirectoryMembership(previousSnapshot); + var targetMembership = CreateDirectoryMembership(snapshot); + var changes = hasAppliedClusterMembershipSnapshot && previousSnapshot.Version != MembershipVersion.MinValue + ? snapshot.CreateUpdate(previousSnapshot) + : snapshot.AsUpdate(); + var statusChanges = new List(); + foreach (var change in changes.Changes) + { + if (!change.SiloAddress.Equals(MyAddress) && change.Status.IsTerminating()) + { + statusChanges.Add(change); + } + } - // Find the last silo with hash smaller than the new silo, and insert the latter after (this is why we have +1 here) the former. - // Notice that FindLastIndex might return -1 if this should be the first silo in the list, but then - // 'index' will get 0, as needed. - int index = existing.MembershipRingList.FindLastIndex(siloAddr => siloAddr.GetConsistentHashCode() < hash) + 1; + statusChanges.Sort(static (left, right) => CompareSiloAddress(left.SiloAddress, right.SiloAddress)); + foreach (var change in statusChanges) + { + OnSiloStatusChange(previousMembership, change.SiloAddress, change.Status); + } - this.directoryMembership = new DirectoryMembership( - existing.MembershipRingList.Insert(index, silo), - existing.MembershipCache.Add(silo)); + var removedSilos = new List<(SiloAddress Silo, SiloStatus Status)>(); + foreach (var silo in previousMembership.MembershipRingList) + { + if (!silo.Equals(MyAddress) && !targetMembership.MembershipCache.Contains(silo)) + { + var status = snapshot.GetSiloStatus(silo); + removedSilos.Add((silo, status == SiloStatus.None ? SiloStatus.Dead : status)); + } + } - HandoffManager.ProcessSiloAddEvent(silo); + var addedSilos = new List(); + foreach (var silo in targetMembership.MembershipRingList) + { + if (!silo.Equals(MyAddress) && !previousMembership.MembershipCache.Contains(silo)) + { + addedSilos.Add(silo); + } + } + + this.directoryMembership = targetMembership; + + foreach (var (silo, _) in removedSilos) + { + AdjustLocalDirectory(silo, dead: true); + AdjustLocalCache(silo, dead: true); + + LogDebugSiloRemovedSilo(MyAddress, silo); + } - AdjustLocalDirectory(silo, dead: false); - AdjustLocalCache(silo, dead: false); + foreach (var silo in addedSilos) + { + HandoffManager.ProcessSiloAddEvent(silo); - LogDebugSiloAddedSilo(MyAddress, silo); + AdjustLocalDirectory(silo, dead: false); + AdjustLocalCache(silo, dead: false); + + LogDebugSiloAddedSilo(MyAddress, silo); + } + + appliedClusterMembershipSnapshot = snapshot; + hasAppliedClusterMembershipSnapshot = true; } } - private void RemoveServer(SiloAddress silo, SiloStatus status) + private DirectoryMembership CreateDirectoryMembership(ClusterMembershipSnapshot snapshot) { - lock (this.writeLock) + var members = new List(); + foreach (var member in snapshot.Members) { - try + if (member.Value.Status == SiloStatus.Active) { - // Only notify the catalog once. Order is important: call BEFORE updating membershipRingList. - _catalog = _serviceProvider.GetRequiredService(); - _catalog.OnSiloStatusChange(this, silo, status); + members.Add(member.Key); } - catch (Exception exc) + } + + if (Running && !members.Contains(MyAddress)) + { + members.Add(MyAddress); + } + + return DirectoryMembership.Create(members); + } + + private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddress updatedSilo, SiloStatus status) + { + if (updatedSilo.Equals(MyAddress) || !status.IsTerminating()) + { + return; + } + + if (status == SiloStatus.Dead) + { + runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo); + } + + var activationsToShutdown = new List(); + var resolver = grainDirectoryResolver ??= _serviceProvider.GetRequiredService(); + foreach (var activation in localActivations) + { + try { - LogErrorCatalogSiloStatusChangeNotificationException(exc, new(silo)); - } + var activationData = activation.Value; + var placementStrategy = activationData.GetComponent(); + var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true }; + if (!isUsingGrainDirectory || !resolver.IsUsingDefaultDirectory(activationData.GrainId.Type)) + { + continue; + } + + if (!updatedSilo.Equals(CalculateGrainDirectoryPartition(activationData.GrainId, previousMembership))) + { + continue; + } - var existing = this.directoryMembership; - if (!existing.MembershipCache.Contains(silo)) + activationsToShutdown.Add(activationData); + } + catch (Exception exception) { - // we have already removed this silo - return; + LogErrorSiloStatusChangeNotification(new(updatedSilo), exception); } + } - this.directoryMembership = new DirectoryMembership( - existing.MembershipRingList.Remove(silo), - existing.MembershipCache.Remove(silo)); + if (activationsToShutdown.Count == 0) + { + return; + } - AdjustLocalDirectory(silo, dead: true); - AdjustLocalCache(silo, dead: true); + LogInfoSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo)); - LogDebugSiloRemovedSilo(MyAddress, silo); + var reasonText = $"This activation is being deactivated due to a failure of server {updatedSilo}, since it was responsible for this activation's grain directory registration."; + var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText); + foreach (var activation in activationsToShutdown) + { + try + { + activation.Deactivate(reason, CancellationToken.None); + } + catch (Exception exception) + { + LogErrorDeactivatingActivationForRemovedSilo(exception, activation.GrainId, new(updatedSilo)); + } } } @@ -294,24 +454,6 @@ private void AdjustLocalCache(SiloAddress silo, bool dead) return existing.Count > 1 ? existing[(index + 1) % existing.Count] : null; } - public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus status) - { - // This silo's status has changed - if (!Equals(updatedSilo, MyAddress)) // Status change for some other silo - { - if (status.IsTerminating()) - { - // QueueAction up the "Remove" to run on a system turn - CacheValidator.WorkItemGroup.QueueAction(() => RemoveServer(updatedSilo, status)); - } - else if (status == SiloStatus.Active) // do not do anything with SiloStatus.Starting -- wait until it actually becomes active - { - // QueueAction up the "Remove" to run on a system turn - CacheValidator.WorkItemGroup.QueueAction(() => AddServer(updatedSilo)); - } - } - } - private bool IsValidSilo(SiloAddress? silo) => siloStatusOracle.IsFunctionalDirectory(silo); /// @@ -321,6 +463,9 @@ public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus sta /// /// public SiloAddress? CalculateGrainDirectoryPartition(GrainId grainId) + => CalculateGrainDirectoryPartition(grainId, this.directoryMembership); + + private SiloAddress? CalculateGrainDirectoryPartition(GrainId grainId, DirectoryMembership membership) { // give a special treatment for special grains if (grainId.IsSystemTarget()) @@ -352,7 +497,7 @@ public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus sta // is doing something valuable. bool excludeMySelf = !Running; - var existing = this.directoryMembership; + var existing = membership; if (existing.MembershipRingList.Count == 0) { // If the membership ring is empty, then we're the owner by default unless we're stopping. @@ -812,6 +957,12 @@ private bool IsSiloNextInTheRing(SiloAddress siloAddr, int hash, bool excludeMyS return siloAddr.GetConsistentHashCode() <= hash && (!excludeMySelf || !siloAddr.Equals(MyAddress)); } + private static int CompareSiloAddress(SiloAddress left, SiloAddress right) + { + var hashComparison = left.GetConsistentHashCode().CompareTo(right.GetConsistentHashCode()); + return hashComparison != 0 ? hashComparison : left.CompareTo(right); + } + public bool IsSiloInCluster(SiloAddress silo) { return this.directoryMembership.MembershipCache.Contains(silo); @@ -852,11 +1003,37 @@ private readonly struct SiloHashLogValue(SiloAddress? silo) private partial void LogDebugSiloAddedSilo(SiloAddress siloAddress, SiloAddress otherSiloAddress); [LoggerMessage( - EventId = (int)ErrorCode.Directory_SiloStatusChangeNotification_Exception, Level = LogLevel.Error, - Message = "CatalogSiloStatusListener.SiloStatusChangeNotification has thrown an exception when notified about removed silo {Silo}." + EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception, + Message = "LocalGrainDirectory has thrown an exception while handling removal of silo {Silo}." + )] + private partial void LogErrorSiloStatusChangeNotification(SiloAddressLogValue silo, Exception exception); + + [LoggerMessage( + Level = LogLevel.Information, + EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification, + Message = "LocalGrainDirectory is deactivating {Count} activations due to a failure of silo {Silo}, since it is a primary directory partition to these grain ids." + )] + private partial void LogInfoSiloStatusChangeNotification(int count, SiloAddressLogValue silo); + + [LoggerMessage( + Level = LogLevel.Error, + EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception, + Message = "LocalGrainDirectory has thrown an exception while deactivating activation {GrainId} due to removal of silo {Silo}." )] - private partial void LogErrorCatalogSiloStatusChangeNotificationException(Exception exception, SiloAddressLogValue silo); + private partial void LogErrorDeactivatingActivationForRemovedSilo(Exception exception, GrainId grainId, SiloAddressLogValue silo); + + [LoggerMessage( + Level = LogLevel.Error, + Message = "Error processing cluster membership updates." + )] + private partial void LogErrorProcessingMembershipUpdates(Exception exception); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Failed to refresh cluster membership after a directory membership update processing error." + )] + private partial void LogWarningRefreshingClusterMembershipFailed(Exception exception); [LoggerMessage( Level = LogLevel.Debug, @@ -984,6 +1161,14 @@ public DirectoryMembership(ImmutableList membershipRingList, Immuta public static DirectoryMembership Default { get; } = new DirectoryMembership(ImmutableList.Empty, ImmutableHashSet.Empty); + public static DirectoryMembership Create(IEnumerable members) + { + var builder = ImmutableList.CreateBuilder(); + builder.AddRange(members); + builder.Sort(LocalGrainDirectory.CompareSiloAddress); + var ring = builder.ToImmutable(); + return new DirectoryMembership(ring, ring.ToImmutableHashSet()); + } public ImmutableList MembershipRingList { get; } public ImmutableHashSet MembershipCache { get; } diff --git a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs index db66ed56d2c..066bbb1aa4d 100644 --- a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs +++ b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs @@ -140,6 +140,7 @@ public async Task LocalGrainDirectoryStopDoesNotDisposeRegisteredCustomCache() localSiloDetails.Name.Returns("TestSilo"); localSiloDetails.ClusterId.Returns("TestCluster"); var siloStatusOracle = Substitute.For(); + var membershipService = new MockClusterMembershipService(); var grainFactory = Substitute.For(); var services = new ServiceCollection() .AddSingleton(cache) @@ -160,6 +161,7 @@ public async Task LocalGrainDirectoryStopDoesNotDisposeRegisteredCustomCache() serviceProvider: services, siloDetails: localSiloDetails, siloStatusOracle: siloStatusOracle, + clusterMembershipService: membershipService.Target, grainFactory: grainFactory, grainDirectoryPartitionFactory: partitionFactory, developmentClusterMembershipOptions: Options.Create(new DevelopmentClusterMembershipOptions()), From 9f9b13dee177121a228c951d739a493aefd58ec9 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 16:29:49 -0700 Subject: [PATCH 02/15] Harden LocalGrainDirectory shutdown cleanup Suppress expected shutdown failures while stopping membership processing and disposing the directory cache. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 4ca6a201520..ef0570ef086 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -10,6 +10,7 @@ using Microsoft.Extensions.Options; using Orleans.Configuration; using Orleans.GrainDirectory; +using Orleans.Internal; using Orleans.Runtime.Scheduler; namespace Orleans.Runtime.GrainDirectory @@ -147,12 +148,12 @@ public async Task StopAsync() _membershipUpdatesCancellation.Cancel(); if (membershipUpdatesTask is { } task) { - await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); + await task.SuppressThrowing(); } if (this.disposeDirectoryCache) { - await GrainDirectoryCacheFactory.DisposeGrainDirectoryCacheAsync(DirectoryCache); + await GrainDirectoryCacheFactory.DisposeGrainDirectoryCacheAsync(DirectoryCache).SuppressThrowing(); } DirectoryPartition.Clear(); @@ -196,7 +197,7 @@ private async Task ProcessMembershipUpdates(CancellationToken cancellationToken) } snapshot = clusterMembershipService.CurrentSnapshot; - await Task.Delay(RETRY_DELAY, cancellationToken); + await Task.Delay(RETRY_DELAY, cancellationToken).SuppressThrowing(); } } } From 96004b3ae39fe5a62fb0a3337fe8e791cf4286bb Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 16:41:00 -0700 Subject: [PATCH 03/15] Simplify directory membership reconciliation Remove redundant locking and cancellation from snapshot application, publish the latest directory membership before side effects, and simplify defunct entry cleanup against the current membership. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/LocalGrainDirectory.cs | 167 ++++++++---------- 1 file changed, 74 insertions(+), 93 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index ef0570ef086..154d5dcc36d 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -24,11 +24,6 @@ internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ILifec private readonly IInternalGrainFactory grainFactory; private readonly ActivationDirectory localActivations; private readonly InsideRuntimeClient runtimeClient; -#if NET9_0_OR_GREATER - private readonly Lock writeLock = new(); -#else - private readonly object writeLock = new(); -#endif private readonly IServiceProvider _serviceProvider; private readonly CancellationTokenSource _membershipUpdatesCancellation = new(); private DirectoryMembership directoryMembership = DirectoryMembership.Default; @@ -167,12 +162,12 @@ private async Task ProcessMembershipUpdates(CancellationToken cancellationToken) { try { - await ApplyMembershipSnapshot(snapshot, cancellationToken); + await ApplyMembershipSnapshot(snapshot); await foreach (var update in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken)) { snapshot = update; - await ApplyMembershipSnapshot(snapshot, cancellationToken); + await ApplyMembershipSnapshot(snapshot); } } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) @@ -202,92 +197,75 @@ private async Task ProcessMembershipUpdates(CancellationToken cancellationToken) } } - private Task ApplyMembershipSnapshot(ClusterMembershipSnapshot snapshot, CancellationToken cancellationToken) + private Task ApplyMembershipSnapshot(ClusterMembershipSnapshot snapshot) { return CacheValidator.RunOrQueueTask(() => { - cancellationToken.ThrowIfCancellationRequested(); - ApplyMembershipSnapshot(snapshot); + ApplyMembershipSnapshotCore(snapshot); return Task.CompletedTask; }); } - private void ApplyMembershipSnapshot(ClusterMembershipSnapshot snapshot) + private void ApplyMembershipSnapshotCore(ClusterMembershipSnapshot snapshot) { if (!Running) { return; } - lock (this.writeLock) + var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default; + if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version) { - var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default; - if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version) - { - return; - } + return; + } - var previousMembership = CreateDirectoryMembership(previousSnapshot); - var targetMembership = CreateDirectoryMembership(snapshot); - var changes = hasAppliedClusterMembershipSnapshot && previousSnapshot.Version != MembershipVersion.MinValue - ? snapshot.CreateUpdate(previousSnapshot) - : snapshot.AsUpdate(); - var statusChanges = new List(); - foreach (var change in changes.Changes) - { - if (!change.SiloAddress.Equals(MyAddress) && change.Status.IsTerminating()) - { - statusChanges.Add(change); - } - } + var previousMembership = CreateDirectoryMembership(previousSnapshot); + var targetMembership = CreateDirectoryMembership(snapshot); + this.directoryMembership = targetMembership; - statusChanges.Sort(static (left, right) => CompareSiloAddress(left.SiloAddress, right.SiloAddress)); - foreach (var change in statusChanges) - { - OnSiloStatusChange(previousMembership, change.SiloAddress, change.Status); - } + var removedSilos = GetMembershipDifference(previousMembership, targetMembership); + var addedSilos = GetMembershipDifference(targetMembership, previousMembership); - var removedSilos = new List<(SiloAddress Silo, SiloStatus Status)>(); - foreach (var silo in previousMembership.MembershipRingList) - { - if (!silo.Equals(MyAddress) && !targetMembership.MembershipCache.Contains(silo)) - { - var status = snapshot.GetSiloStatus(silo); - removedSilos.Add((silo, status == SiloStatus.None ? SiloStatus.Dead : status)); - } - } + ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership); + AdjustLocalDirectory(targetMembership); + AdjustLocalCache(targetMembership); - var addedSilos = new List(); - foreach (var silo in targetMembership.MembershipRingList) - { - if (!silo.Equals(MyAddress) && !previousMembership.MembershipCache.Contains(silo)) - { - addedSilos.Add(silo); - } - } - - this.directoryMembership = targetMembership; + foreach (var silo in removedSilos) + { + LogDebugSiloRemovedSilo(MyAddress, silo); + } - foreach (var (silo, _) in removedSilos) - { - AdjustLocalDirectory(silo, dead: true); - AdjustLocalCache(silo, dead: true); + foreach (var silo in addedSilos) + { + HandoffManager.ProcessSiloAddEvent(silo); + LogDebugSiloAddedSilo(MyAddress, silo); + } - LogDebugSiloRemovedSilo(MyAddress, silo); - } + appliedClusterMembershipSnapshot = snapshot; + hasAppliedClusterMembershipSnapshot = true; + } - foreach (var silo in addedSilos) + private void ProcessSiloStatusChanges( + ClusterMembershipSnapshot snapshot, + ClusterMembershipSnapshot previousSnapshot, + DirectoryMembership previousMembership) + { + var changes = previousSnapshot.Version != MembershipVersion.MinValue + ? snapshot.CreateUpdate(previousSnapshot) + : snapshot.AsUpdate(); + var statusChanges = new List(); + foreach (var change in changes.Changes) + { + if (!change.SiloAddress.Equals(MyAddress) && change.Status.IsTerminating()) { - HandoffManager.ProcessSiloAddEvent(silo); - - AdjustLocalDirectory(silo, dead: false); - AdjustLocalCache(silo, dead: false); - - LogDebugSiloAddedSilo(MyAddress, silo); + statusChanges.Add(change); } + } - appliedClusterMembershipSnapshot = snapshot; - hasAppliedClusterMembershipSnapshot = true; + statusChanges.Sort(static (left, right) => CompareSiloAddress(left.SiloAddress, right.SiloAddress)); + foreach (var change in statusChanges) + { + OnSiloStatusChange(previousMembership, change.SiloAddress, change.Status); } } @@ -310,6 +288,22 @@ private DirectoryMembership CreateDirectoryMembership(ClusterMembershipSnapshot return DirectoryMembership.Create(members); } + private List GetMembershipDifference( + DirectoryMembership currentMembership, + DirectoryMembership otherMembership) + { + var result = new List(); + foreach (var silo in currentMembership.MembershipRingList) + { + if (!silo.Equals(MyAddress) && !otherMembership.MembershipCache.Contains(silo)) + { + result.Add(silo); + } + } + + return result; + } + private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddress updatedSilo, SiloStatus status) { if (updatedSilo.Equals(MyAddress) || !status.IsTerminating()) @@ -371,64 +365,51 @@ private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddr } } - /// - /// Adjust local directory following the addition/removal of a silo - /// - private void AdjustLocalDirectory(SiloAddress silo, bool dead) + private void AdjustLocalDirectory(DirectoryMembership targetMembership) { - // Determine which activations to remove. var activationsToRemove = new List<(GrainId, ActivationId)>(); foreach (var entry in this.DirectoryPartition.GetItems()) { if (entry.Value.Activation is { } address) { - // Include any activations from dead silos and from predecessors. - if (dead && address.SiloAddress!.Equals(silo) || address.SiloAddress!.IsPredecessorOf(silo)) + if (IsDefunctActivationSilo(targetMembership, address.SiloAddress)) { activationsToRemove.Add((entry.Key, address.ActivationId)); } } } - // Remove all defunct activations. foreach (var activation in activationsToRemove) { DirectoryPartition.RemoveActivation(activation.Item1, activation.Item2); } } - /// Adjust local cache following the removal of a silo by dropping: - /// 1) entries that point to activations located on the removed silo - /// 2) entries for grains that are now owned by this silo (me) - /// 3) entries for grains that were owned by this removed silo - we currently do NOT do that. - /// If we did 3, we need to do that BEFORE we change the membershipRingList (based on old Membership). - /// We don't do that since first cache refresh handles that. - /// Second, since Membership events are not guaranteed to be ordered, we may remove a cache entry that does not really point to a failed silo. - /// To do that properly, we need to store for each cache entry who was the directory owner that registered this activation (the original partition owner). - private void AdjustLocalCache(SiloAddress silo, bool dead) + private void AdjustLocalCache(DirectoryMembership targetMembership) { - // remove all records of activations located on the removed silo foreach (var tuple in DirectoryCache.KeyValues) { var activationAddress = tuple.ActivationAddress; - // 2) remove entries now owned by me (they should be retrieved from my directory partition) - if (MyAddress.Equals(CalculateGrainDirectoryPartition(activationAddress.GrainId))) + // Remove entries now owned by me. They should be retrieved from my directory partition. + if (MyAddress.Equals(CalculateGrainDirectoryPartition(activationAddress.GrainId, targetMembership))) { DirectoryCache.Remove(activationAddress.GrainId); continue; } - // 1) remove entries that point to activations located on the removed silo - // For dead silos, remove any activation registered to that silo or one of its predecessors. - // For new silos, remove any activation registered to one of its predecessors. - if (activationAddress.SiloAddress!.IsPredecessorOf(silo) || dead && activationAddress.SiloAddress.Equals(silo)) + if (IsDefunctActivationSilo(targetMembership, activationAddress.SiloAddress)) { DirectoryCache.Remove(activationAddress.GrainId); } } } + private static bool IsDefunctActivationSilo(DirectoryMembership targetMembership, SiloAddress? silo) + { + return silo is null || !targetMembership.MembershipCache.Contains(silo); + } + internal SiloAddress? FindPredecessor(SiloAddress silo) { var existing = directoryMembership.MembershipRingList; From 737161cc78d9f86046cb5dbab6f02af482efdea5 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 16:44:38 -0700 Subject: [PATCH 04/15] Use membership versions for stale directory cleanup Evict LocalGrainDirectory directory and cache entries for terminating silos immediately, and only evict unknown-silo entries when the address was registered before the applied membership snapshot. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/LocalGrainDirectory.cs | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 154d5dcc36d..920ed15ae44 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -227,8 +227,8 @@ private void ApplyMembershipSnapshotCore(ClusterMembershipSnapshot snapshot) var addedSilos = GetMembershipDifference(targetMembership, previousMembership); ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership); - AdjustLocalDirectory(targetMembership); - AdjustLocalCache(targetMembership); + AdjustLocalDirectory(snapshot); + AdjustLocalCache(snapshot, targetMembership); foreach (var silo in removedSilos) { @@ -365,14 +365,14 @@ private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddr } } - private void AdjustLocalDirectory(DirectoryMembership targetMembership) + private void AdjustLocalDirectory(ClusterMembershipSnapshot snapshot) { var activationsToRemove = new List<(GrainId, ActivationId)>(); foreach (var entry in this.DirectoryPartition.GetItems()) { if (entry.Value.Activation is { } address) { - if (IsDefunctActivationSilo(targetMembership, address.SiloAddress)) + if (IsDefunctActivation(address, snapshot)) { activationsToRemove.Add((entry.Key, address.ActivationId)); } @@ -385,7 +385,7 @@ private void AdjustLocalDirectory(DirectoryMembership targetMembership) } } - private void AdjustLocalCache(DirectoryMembership targetMembership) + private void AdjustLocalCache(ClusterMembershipSnapshot snapshot, DirectoryMembership targetMembership) { foreach (var tuple in DirectoryCache.KeyValues) { @@ -398,16 +398,26 @@ private void AdjustLocalCache(DirectoryMembership targetMembership) continue; } - if (IsDefunctActivationSilo(targetMembership, activationAddress.SiloAddress)) + if (IsDefunctActivation(activationAddress, snapshot)) { DirectoryCache.Remove(activationAddress.GrainId); } } } - private static bool IsDefunctActivationSilo(DirectoryMembership targetMembership, SiloAddress? silo) + private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot) { - return silo is null || !targetMembership.MembershipCache.Contains(silo); + if (address.SiloAddress is not { } silo) + { + return true; + } + + if (snapshot.Members.TryGetValue(silo, out var member)) + { + return member.Status.IsTerminating(); + } + + return address.MembershipVersion < snapshot.Version; } internal SiloAddress? FindPredecessor(SiloAddress silo) From 6e548f3c96b71c0f36bc199362ed675b286849ea Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 16:56:24 -0700 Subject: [PATCH 05/15] Refresh membership before address RPC routing Refresh and apply cluster membership snapshots when grain directory RPCs receive GrainAddress values from a newer membership version before making ownership decisions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/LocalGrainDirectory.cs | 56 ++++++++++++++++++- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 920ed15ae44..a98355043e5 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -304,6 +304,48 @@ private List GetMembershipDifference( return result; } + private Task RefreshMembershipIfNewer(GrainAddress address, GrainAddress? previousAddress = null) + { + var targetVersion = address.MembershipVersion; + if (previousAddress is not null && previousAddress.MembershipVersion > targetVersion) + { + targetVersion = previousAddress.MembershipVersion; + } + + return RefreshMembershipIfNewer(targetVersion); + } + + private Task RefreshMembershipIfNewer(List addresses) + { + var targetVersion = MembershipVersion.MinValue; + foreach (var address in addresses) + { + if (address.MembershipVersion > targetVersion) + { + targetVersion = address.MembershipVersion; + } + } + + return RefreshMembershipIfNewer(targetVersion); + } + + private async Task RefreshMembershipIfNewer(MembershipVersion targetVersion) + { + if (targetVersion <= appliedClusterMembershipSnapshot.Version) + { + return; + } + + var snapshot = clusterMembershipService.CurrentSnapshot; + if (targetVersion > snapshot.Version) + { + await clusterMembershipService.Refresh(targetVersion); + snapshot = clusterMembershipService.CurrentSnapshot; + } + + await ApplyMembershipSnapshot(snapshot); + } + private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddress updatedSilo, SiloStatus status) { if (updatedSilo.Equals(MyAddress) || !status.IsTerminating()) @@ -564,6 +606,8 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres DirectoryInstruments.RegistrationsSingleActIssued.Add(1); } + await RefreshMembershipIfNewer(address, previousAddress); + // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync"); @@ -614,20 +658,22 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres } } - public Task UnregisterAfterNonexistingActivation(GrainAddress addr, SiloAddress origin) + public async Task UnregisterAfterNonexistingActivation(GrainAddress addr, SiloAddress origin) { LogTraceUnregisterAfterNonexistingActivation(addr, origin); + await RefreshMembershipIfNewer(addr); + if (origin == null || this.directoryMembership.MembershipCache.Contains(origin)) { // the request originated in this cluster, call unregister here - return UnregisterAsync(addr, UnregistrationCause.NonexistentActivation, 0); + await UnregisterAsync(addr, UnregistrationCause.NonexistentActivation, 0); } else { // the request originated in another cluster, call unregister there var remoteDirectory = GetDirectoryReference(origin); - return remoteDirectory.UnregisterAsync(addr, UnregistrationCause.NonexistentActivation); + await remoteDirectory.UnregisterAsync(addr, UnregistrationCause.NonexistentActivation); } } @@ -645,6 +691,8 @@ public async Task UnregisterAsync(GrainAddress address, UnregistrationCause caus if (hopCount == 0) InvalidateCacheEntry(address); + await RefreshMembershipIfNewer(address); + // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync"); @@ -711,6 +759,8 @@ public async Task UnregisterManyAsync(List addresses, Unregistrati DirectoryInstruments.UnregistrationsManyIssued.Add(1); } + await RefreshMembershipIfNewer(addresses); + Dictionary>? forwardlist = null; UnregisterOrPutInForwardList(addresses, cause, hopCount, ref forwardlist, "UnregisterManyAsync"); From fdca39973d5dbff7913db21ad02971eb513212fa Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 17:32:50 -0700 Subject: [PATCH 06/15] Simplify LocalGrainDirectory membership updates Apply the latest cluster membership snapshot directly, remove the unused IsSiloInCluster contract, and simplify related LocalGrainDirectory expressions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/ILocalGrainDirectory.cs | 7 -- .../GrainDirectory/LocalGrainDirectory.cs | 111 ++++++++---------- 2 files changed, 48 insertions(+), 70 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs index 755f355ccb4..c5e0f07f44f 100644 --- a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs @@ -95,12 +95,5 @@ internal interface ILocalGrainDirectory : IDhtGrainDirectory /// Attempts to find the specified grain in the directory cache. /// bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address); - - /// - /// For determining message forwarding logic, we sometimes check if a silo is part of this cluster or not - /// - /// the address of the silo - /// true if the silo is known to be part of this cluster - bool IsSiloInCluster(SiloAddress silo); } } diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index a98355043e5..ef95ce52b65 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -107,11 +107,11 @@ public LocalGrainDirectory( DirectoryInstruments.RegisterDirectoryPartitionSizeObserve(() => DirectoryPartition.Count); DirectoryInstruments.RegisterMyPortionRingDistanceObserve(() => RingDistanceToSuccessor()); - DirectoryInstruments.RegisterMyPortionRingPercentageObserve(() => (((float)this.RingDistanceToSuccessor()) / ((float)(int.MaxValue * 2L))) * 100); + DirectoryInstruments.RegisterMyPortionRingPercentageObserve(() => this.RingDistanceToSuccessor() / (float)(int.MaxValue * 2L) * 100); DirectoryInstruments.RegisterMyPortionAverageRingPercentageObserve(() => { var ring = this.directoryMembership.MembershipRingList; - return ring.Count == 0 ? 0 : ((float)100 / (float)ring.Count); + return ring.Count == 0 ? 0 : (100 / (float)ring.Count); }); DirectoryInstruments.RegisterRingSizeObserve(() => this.directoryMembership.MembershipRingList.Count); _serviceProvider = serviceProvider; @@ -157,17 +157,16 @@ public async Task StopAsync() private async Task ProcessMembershipUpdates(CancellationToken cancellationToken) { - var snapshot = clusterMembershipService.CurrentSnapshot; while (!cancellationToken.IsCancellationRequested) { try { - await ApplyMembershipSnapshot(snapshot); + await ApplyMembershipSnapshot(); - await foreach (var update in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken)) + await foreach (var _ in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken)) { - snapshot = update; - await ApplyMembershipSnapshot(snapshot); + // Always apply the latest snapshot. + await ApplyMembershipSnapshot(); } } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) @@ -191,58 +190,58 @@ private async Task ProcessMembershipUpdates(CancellationToken cancellationToken) LogWarningRefreshingClusterMembershipFailed(refreshException); } - snapshot = clusterMembershipService.CurrentSnapshot; await Task.Delay(RETRY_DELAY, cancellationToken).SuppressThrowing(); } } } - private Task ApplyMembershipSnapshot(ClusterMembershipSnapshot snapshot) + private Task ApplyMembershipSnapshot() { return CacheValidator.RunOrQueueTask(() => { - ApplyMembershipSnapshotCore(snapshot); + ApplyMembershipSnapshotCore(); return Task.CompletedTask; }); - } - private void ApplyMembershipSnapshotCore(ClusterMembershipSnapshot snapshot) - { - if (!Running) + void ApplyMembershipSnapshotCore() { - return; - } + if (!Running) + { + return; + } - var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default; - if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version) - { - return; - } + var snapshot = clusterMembershipService.CurrentSnapshot; + var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default; + if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version) + { + return; + } - var previousMembership = CreateDirectoryMembership(previousSnapshot); - var targetMembership = CreateDirectoryMembership(snapshot); - this.directoryMembership = targetMembership; + var previousMembership = CreateDirectoryMembership(previousSnapshot); + var targetMembership = CreateDirectoryMembership(snapshot); + this.directoryMembership = targetMembership; - var removedSilos = GetMembershipDifference(previousMembership, targetMembership); - var addedSilos = GetMembershipDifference(targetMembership, previousMembership); + var removedSilos = GetMembershipDifference(previousMembership, targetMembership); + var addedSilos = GetMembershipDifference(targetMembership, previousMembership); - ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership); - AdjustLocalDirectory(snapshot); - AdjustLocalCache(snapshot, targetMembership); + ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership); + AdjustLocalDirectory(snapshot); + AdjustLocalCache(snapshot, targetMembership); - foreach (var silo in removedSilos) - { - LogDebugSiloRemovedSilo(MyAddress, silo); - } + foreach (var silo in removedSilos) + { + LogDebugSiloRemovedSilo(MyAddress, silo); + } - foreach (var silo in addedSilos) - { - HandoffManager.ProcessSiloAddEvent(silo); - LogDebugSiloAddedSilo(MyAddress, silo); - } + foreach (var silo in addedSilos) + { + HandoffManager.ProcessSiloAddEvent(silo); + LogDebugSiloAddedSilo(MyAddress, silo); + } - appliedClusterMembershipSnapshot = snapshot; - hasAppliedClusterMembershipSnapshot = true; + appliedClusterMembershipSnapshot = snapshot; + hasAppliedClusterMembershipSnapshot = true; + } } private void ProcessSiloStatusChanges( @@ -336,14 +335,12 @@ private async Task RefreshMembershipIfNewer(MembershipVersion targetVersion) return; } - var snapshot = clusterMembershipService.CurrentSnapshot; - if (targetVersion > snapshot.Version) + if (targetVersion > clusterMembershipService.CurrentSnapshot.Version) { await clusterMembershipService.Refresh(targetVersion); - snapshot = clusterMembershipService.CurrentSnapshot; } - await ApplyMembershipSnapshot(snapshot); + await ApplyMembershipSnapshot(); } private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddress updatedSilo, SiloStatus status) @@ -456,7 +453,7 @@ private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipS if (snapshot.Members.TryGetValue(silo, out var member)) { - return member.Status.IsTerminating(); + return member.Status == SiloStatus.Dead; } return address.MembershipVersion < snapshot.Version; @@ -747,7 +744,6 @@ private void UnregisterOrPutInForwardList(List addresses, Unregist } } - public async Task UnregisterManyAsync(List addresses, UnregistrationCause cause, int hopCount) { if (hopCount > 0) @@ -1005,11 +1001,6 @@ private static int CompareSiloAddress(SiloAddress left, SiloAddress right) return hashComparison != 0 ? hashComparison : left.CompareTo(right); } - public bool IsSiloInCluster(SiloAddress silo) - { - return this.directoryMembership.MembershipCache.Contains(silo); - } - public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) => this.DirectoryCache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); public bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address) => (address = GetLocalCacheData(grainId)) is not null; void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) @@ -1193,27 +1184,21 @@ private readonly struct SiloHashLogValue(SiloAddress? silo) )] private partial void LogWarningDeleteGrainAsyncNotOwner(GrainId grainId, SiloAddress? forwardAddress, int hopCount); - private class DirectoryMembership + private class DirectoryMembership(ImmutableList membershipRingList, ImmutableHashSet membershipCache) { - public DirectoryMembership(ImmutableList membershipRingList, ImmutableHashSet membershipCache) - { - this.MembershipRingList = membershipRingList; - this.MembershipCache = membershipCache; - } - - public static DirectoryMembership Default { get; } = new DirectoryMembership(ImmutableList.Empty, ImmutableHashSet.Empty); + public static DirectoryMembership Default { get; } = new DirectoryMembership([], []); public static DirectoryMembership Create(IEnumerable members) { var builder = ImmutableList.CreateBuilder(); builder.AddRange(members); - builder.Sort(LocalGrainDirectory.CompareSiloAddress); + builder.Sort(CompareSiloAddress); var ring = builder.ToImmutable(); - return new DirectoryMembership(ring, ring.ToImmutableHashSet()); + return new DirectoryMembership(ring, [.. ring]); } - public ImmutableList MembershipRingList { get; } - public ImmutableHashSet MembershipCache { get; } + public ImmutableList MembershipRingList { get; } = membershipRingList; + public ImmutableHashSet MembershipCache { get; } = membershipCache; } } } From 5e0fd905c6022760115e003e337b4043f688404c Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 17:34:39 -0700 Subject: [PATCH 07/15] Document defunct activation cleanup Clarify why LocalGrainDirectory removes dead-silo activations and stale unknown-silo activations during membership reconciliation. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index ef95ce52b65..483a1595a65 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -453,9 +453,14 @@ private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipS if (snapshot.Members.TryGetValue(silo, out var member)) { + // If this is a known host, remove the activation if the host is dead. return member.Status == SiloStatus.Dead; } + // If this is not a known host, remove the activation if it was registered at an older membership version. + // This indicates that the host must have been removed. + // Hosts cannot activate grains before they are active, and we ensure that we refresh the membership before processing messages, + // so this is a reliable indicator of a defunct activation. return address.MembershipVersion < snapshot.Version; } From 6bddbc359b4b0eca1ae8a6c32cd1938023c6efc0 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 17:56:00 -0700 Subject: [PATCH 08/15] Address PR review feedback Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/GrainDirectoryHandoffManager.cs | 5 ++++- .../GrainDirectory/LocalGrainDirectory.cs | 10 +++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs index 40804db0e40..bac6eb68558 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs @@ -204,7 +204,10 @@ private void EnqueueOperation(string name, object state, Func Date: Mon, 11 May 2026 18:12:16 -0700 Subject: [PATCH 09/15] Refine silo death cleanup handling Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/LocalGrainDirectory.cs | 46 +++--------- .../Networking/SiloConnectionMaintainer.cs | 6 +- .../LocalGrainDirectoryTests.cs | 71 +++++++++++++++++++ 3 files changed, 85 insertions(+), 38 deletions(-) create mode 100644 test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 8ebfca33b28..288f805e85c 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -23,7 +23,6 @@ internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ILifec private readonly IClusterMembershipService clusterMembershipService; private readonly IInternalGrainFactory grainFactory; private readonly ActivationDirectory localActivations; - private readonly InsideRuntimeClient runtimeClient; private readonly IServiceProvider _serviceProvider; private readonly CancellationTokenSource _membershipUpdatesCancellation = new(); private DirectoryMembership directoryMembership = DirectoryMembership.Default; @@ -70,7 +69,6 @@ public LocalGrainDirectory( this.clusterMembershipService = clusterMembershipService; this.grainFactory = grainFactory; this.localActivations = systemTargetShared.ActivationDirectory; - this.runtimeClient = systemTargetShared.RuntimeClient; DirectoryCache = GrainDirectoryCacheFactory.CreateGrainDirectoryCache(serviceProvider, grainDirectoryOptions.Value, out this.disposeDirectoryCache); @@ -303,17 +301,6 @@ private List GetMembershipDifference( return result; } - private Task RefreshMembershipIfNewer(GrainAddress address, GrainAddress? previousAddress = null) - { - var targetVersion = address.MembershipVersion; - if (previousAddress is not null && previousAddress.MembershipVersion > targetVersion) - { - targetVersion = previousAddress.MembershipVersion; - } - - return RefreshMembershipIfNewer(targetVersion); - } - private Task RefreshMembershipIfNewer(List addresses) { var targetVersion = MembershipVersion.MinValue; @@ -350,11 +337,6 @@ private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddr return; } - if (status == SiloStatus.Dead) - { - runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo); - } - var activationsToShutdown = new List(); var resolver = grainDirectoryResolver ??= _serviceProvider.GetRequiredService(); foreach (var activation in localActivations) @@ -444,24 +426,14 @@ private void AdjustLocalCache(ClusterMembershipSnapshot snapshot, DirectoryMembe } } - private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot) + internal static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot) { if (address.SiloAddress is not { } silo) { return true; } - if (snapshot.Members.TryGetValue(silo, out var member)) - { - // If this is a known host, remove the activation if the host is dead. - return member.Status == SiloStatus.Dead; - } - - // If this is not a known host, remove the activation if it was registered at an older membership version. - // This indicates that the host must have been removed. - // Hosts cannot activate grains before they are active, and we ensure that we refresh the membership before processing messages, - // so this is a reliable indicator of a defunct activation. - return address.MembershipVersion < snapshot.Version; + return snapshot.GetSiloStatus(silo) == SiloStatus.Dead; } internal SiloAddress? FindPredecessor(SiloAddress silo) @@ -608,13 +580,13 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres DirectoryInstruments.RegistrationsSingleActIssued.Add(1); } - await RefreshMembershipIfNewer(address, previousAddress); + await RefreshMembershipIfNewer(address.MembershipVersion); // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // After the first forward, we insert a retry delay and recheck owner before forwarding again + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync"); @@ -664,7 +636,7 @@ public async Task UnregisterAfterNonexistingActivation(GrainAddress addr, SiloAd { LogTraceUnregisterAfterNonexistingActivation(addr, origin); - await RefreshMembershipIfNewer(addr); + await RefreshMembershipIfNewer(addr.MembershipVersion); if (origin == null || this.directoryMembership.MembershipCache.Contains(origin)) { @@ -693,13 +665,13 @@ public async Task UnregisterAsync(GrainAddress address, UnregistrationCause caus if (hopCount == 0) InvalidateCacheEntry(address); - await RefreshMembershipIfNewer(address); + await RefreshMembershipIfNewer(address.MembershipVersion); // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // After the first forward, we insert a retry delay and recheck owner before forwarding again + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync"); diff --git a/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs b/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs index c276aaf0adf..4eb336b1415 100644 --- a/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs +++ b/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs @@ -9,15 +9,18 @@ internal partial class SiloConnectionMaintainer : ILifecycleParticipant log; public SiloConnectionMaintainer( ConnectionManager connectionManager, ISiloStatusOracle siloStatusOracle, + IRuntimeClient runtimeClient, ILogger log) { this.connectionManager = connectionManager; this.siloStatusOracle = siloStatusOracle; + this.runtimeClient = runtimeClient; this.log = log; } @@ -42,6 +45,7 @@ public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus sta { if (status == SiloStatus.Dead && updatedSilo != siloStatusOracle.SiloAddress) { + this.runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo); _ = Task.Run(() => this.CloseConnectionAsync(updatedSilo)); } } @@ -68,4 +72,4 @@ private async Task CloseConnectionAsync(SiloAddress silo) )] private static partial void LogExceptionWhileClosingConnections(ILogger logger, SiloAddress siloAddress, Exception exception); } -} \ No newline at end of file +} diff --git a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs new file mode 100644 index 00000000000..cc21b2e2ab9 --- /dev/null +++ b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs @@ -0,0 +1,71 @@ +using System.Collections.Immutable; +using System.Net; +using Orleans.Runtime; +using Orleans.Runtime.GrainDirectory; +using Xunit; + +namespace UnitTests; + +[TestCategory("BVT"), TestCategory("GrainDirectory")] +public class LocalGrainDirectoryTests +{ + [Theory] + [InlineData(SiloStatus.Active)] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public void IsDefunctActivation_DoesNotRemoveNonDeadSilos(SiloStatus status) + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo); + var snapshot = CreateSnapshot(new ClusterMember(silo, status, "silo"), version: 2); + + Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_RemovesDeadSilos() + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo); + var snapshot = CreateSnapshot(new ClusterMember(silo, SiloStatus.Dead, "silo"), version: 2); + + Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_DoesNotRemoveUnknownSiloUntilKnownDead() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 1); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_RemovesSiloReplacedBySuccessor() + { + var silo = CreateSiloAddress(1); + var successor = CreateSiloAddress(2); + var address = CreateGrainAddress(silo, membershipVersion: 1); + var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2); + + Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) + => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version)); + + private static GrainAddress CreateGrainAddress(SiloAddress siloAddress, long membershipVersion = 1) + => new() + { + GrainId = GrainId.Create("test-grain", Guid.NewGuid().ToString("N")), + ActivationId = ActivationId.NewId(), + SiloAddress = siloAddress, + MembershipVersion = new MembershipVersion(membershipVersion) + }; + + private static SiloAddress CreateSiloAddress(int generation, int port = 11111) + => SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), generation); +} From 4ed7dfe8604fd3158c192b9c15a75e29de539539 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 20:13:43 -0700 Subject: [PATCH 10/15] Preserve membership version guard for cleanup Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/LocalGrainDirectory.cs | 2 +- .../LocalGrainDirectoryTests.cs | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 288f805e85c..1c6ea715716 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -433,7 +433,7 @@ internal static bool IsDefunctActivation(GrainAddress address, ClusterMembership return true; } - return snapshot.GetSiloStatus(silo) == SiloStatus.Dead; + return address.MembershipVersion < snapshot.Version && snapshot.GetSiloStatus(silo) == SiloStatus.Dead; } internal SiloAddress? FindPredecessor(SiloAddress silo) diff --git a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs index cc21b2e2ab9..e8a922028d1 100644 --- a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs +++ b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs @@ -32,6 +32,16 @@ public void IsDefunctActivation_RemovesDeadSilos() Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); } + [Fact] + public void IsDefunctActivation_DoesNotRemoveDeadSiloWithoutNewerMembershipVersion() + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(silo, SiloStatus.Dead, "silo"), version: 2); + + Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + [Fact] public void IsDefunctActivation_DoesNotRemoveUnknownSiloUntilKnownDead() { From 1e9f516dc2a1ca7035a23fdff69929ed923b8bae Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 20:17:26 -0700 Subject: [PATCH 11/15] Correct defunct activation predicate Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/LocalGrainDirectory.cs | 3 ++- .../LocalGrainDirectoryTests.cs | 13 +++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 1c6ea715716..aa2a9a57f0e 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -433,7 +433,8 @@ internal static bool IsDefunctActivation(GrainAddress address, ClusterMembership return true; } - return address.MembershipVersion < snapshot.Version && snapshot.GetSiloStatus(silo) == SiloStatus.Dead; + var status = snapshot.GetSiloStatus(silo); + return status == SiloStatus.Dead || (status == SiloStatus.None && address.MembershipVersion < snapshot.Version); } internal SiloAddress? FindPredecessor(SiloAddress silo) diff --git a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs index e8a922028d1..3b820aec7d7 100644 --- a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs +++ b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs @@ -26,31 +26,32 @@ public void IsDefunctActivation_DoesNotRemoveNonDeadSilos(SiloStatus status) public void IsDefunctActivation_RemovesDeadSilos() { var silo = CreateSiloAddress(1); - var address = CreateGrainAddress(silo); + var address = CreateGrainAddress(silo, membershipVersion: 2); var snapshot = CreateSnapshot(new ClusterMember(silo, SiloStatus.Dead, "silo"), version: 2); Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); } [Fact] - public void IsDefunctActivation_DoesNotRemoveDeadSiloWithoutNewerMembershipVersion() + public void IsDefunctActivation_DoesNotRemoveUnknownSiloWithoutNewerMembershipVersion() { var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); var address = CreateGrainAddress(silo, membershipVersion: 2); - var snapshot = CreateSnapshot(new ClusterMember(silo, SiloStatus.Dead, "silo"), version: 2); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); } [Fact] - public void IsDefunctActivation_DoesNotRemoveUnknownSiloUntilKnownDead() + public void IsDefunctActivation_RemovesUnknownSiloWithOlderMembershipVersion() { var silo = CreateSiloAddress(1); var unrelatedSilo = CreateSiloAddress(1, port: 11112); var address = CreateGrainAddress(silo, membershipVersion: 1); var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); - Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); } [Fact] @@ -58,7 +59,7 @@ public void IsDefunctActivation_RemovesSiloReplacedBySuccessor() { var silo = CreateSiloAddress(1); var successor = CreateSiloAddress(2); - var address = CreateGrainAddress(silo, membershipVersion: 1); + var address = CreateGrainAddress(silo, membershipVersion: 2); var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2); Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); From 6ba08939f0b3901d4be3d070ee350b144b970047 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 20:56:07 -0700 Subject: [PATCH 12/15] Use version-aware silo status checks Use ClusterMembershipSnapshot.GetSiloStatus with registration membership versions when deciding whether grain directory entries are dead. This keeps shutting down and stopping silos valid until they are marked dead while still filtering old unknown or replaced silos. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/CachedGrainLocator.cs | 20 +--- .../GrainDirectoryHandoffManager.cs | 19 +++- .../GrainDirectoryPartition.Interface.cs | 3 +- .../GrainDirectory/LocalGrainDirectory.cs | 19 ++-- .../LocalGrainDirectoryPartition.cs | 23 ++--- .../ClusterMembershipSnapshot.cs | 12 +++ .../SiloStatusListenerManager.cs | 2 +- src/api/Orleans.Runtime/Orleans.Runtime.cs | 2 + .../Directory/CachedGrainLocatorTests.cs | 31 +++++- .../ClusterMembershipSnapshotTests.cs | 48 ++++++++++ .../GrainDirectoryHandoffManagerTests.cs | 73 ++++++++++++++ .../GrainDirectoryPartitionTests.cs | 96 +++++++++---------- 12 files changed, 256 insertions(+), 92 deletions(-) create mode 100644 test/Orleans.Runtime.Internal.Tests/ClusterMembershipSnapshotTests.cs create mode 100644 test/Orleans.Runtime.Internal.Tests/GrainDirectoryHandoffManagerTests.cs diff --git a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs index 8314c99c0ed..c11625dde19 100644 --- a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs +++ b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs @@ -169,10 +169,10 @@ private async Task ListenToClusterChange() var updates = this.clusterMembershipService.MembershipUpdates.WithCancellation(this.shutdownToken.Token); await foreach (var snapshot in updates) { - // Active filtering: detect silos that went down and try to clean proactively the directory + // Active filtering: detect dead silos and try to clean proactively the directory var changes = snapshot.CreateUpdate(previousSnapshot).Changes; var deadSilos = changes - .Where(member => member.Status.IsTerminating()) + .Where(member => member.Status == SiloStatus.Dead) .Select(member => member.SiloAddress) .ToList(); @@ -196,17 +196,7 @@ private bool IsKnownDeadSilo(GrainAddress grainAddress) private bool IsKnownDeadSilo(SiloAddress siloAddress, MembershipVersion membershipVersion) { var current = this.clusterMembershipService.CurrentSnapshot; - - // Check if the target silo is in the cluster - if (current.Members.TryGetValue(siloAddress, out var value)) - { - // It is, check if it's alive - return value.Status.IsTerminating(); - } - - // We didn't find it in the cluster. If the silo entry is too old, it has been cleaned in the membership table: the entry isn't valid anymore. - // Otherwise, maybe the membership service isn't up to date yet. The entry should be valid - return current.Version > membershipVersion; + return siloAddress is null || current.GetSiloStatus(siloAddress, membershipVersion) == SiloStatus.Dead; } private static void ThrowUnsupportedGrainType(GrainId grainId) => throw new InvalidOperationException($"Unsupported grain type for grain {grainId}"); @@ -222,10 +212,10 @@ public bool TryLookupInCache(GrainId grainId, out GrainAddress address) ThrowUnsupportedGrainType(grainId); } - if (this.cache.LookUp(grainId, out address, out var version)) + if (this.cache.LookUp(grainId, out address, out _)) { // If the silo is dead, remove the entry - if (IsKnownDeadSilo(address.SiloAddress, new MembershipVersion(version))) + if (IsKnownDeadSilo(address)) { address = default; this.cache.Remove(grainId); diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs index bac6eb68558..9a79a54fba0 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs @@ -17,24 +17,24 @@ internal sealed partial class GrainDirectoryHandoffManager private static readonly TimeSpan RetryDelay = TimeSpan.FromMilliseconds(250); private readonly LocalGrainDirectory localDirectory; private readonly ISiloStatusOracle siloStatusOracle; + private readonly IClusterMembershipService clusterMembershipService; private readonly IInternalGrainFactory grainFactory; private readonly ILogger logger; - private readonly Factory createPartion; private readonly Queue<(string name, object state, Func action)> pendingOperations = new(); private readonly AsyncLock executorLock = new AsyncLock(); internal GrainDirectoryHandoffManager( LocalGrainDirectory localDirectory, ISiloStatusOracle siloStatusOracle, + IClusterMembershipService clusterMembershipService, IInternalGrainFactory grainFactory, - Factory createPartion, ILoggerFactory loggerFactory) { logger = loggerFactory.CreateLogger(); this.localDirectory = localDirectory; this.siloStatusOracle = siloStatusOracle; + this.clusterMembershipService = clusterMembershipService; this.grainFactory = grainFactory; - this.createPartion = createPartion; } internal void ProcessSiloAddEvent(SiloAddress addedSilo) @@ -128,9 +128,10 @@ private async Task AcceptExistingRegistrationsAsync(List singleAct { if (!this.localDirectory.Running) return; + var snapshot = this.clusterMembershipService.CurrentSnapshot; for (var i = singleActivations.Count - 1; i >= 0; i--) { - if (singleActivations[i].SiloAddress is not { } siloAddress || this.siloStatusOracle.GetApproximateSiloStatus(siloAddress).IsTerminating()) + if (!IsTransferableRegistration(singleActivations[i], snapshot)) { singleActivations.RemoveAt(i); } @@ -199,6 +200,16 @@ private async Task DestroyDuplicateActivationsAsync(Dictionary action) { lock (this) diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs index 4e008eadc98..7b6db82ce39 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs @@ -111,7 +111,8 @@ private GrainAddress RegisterCore(GrainAddress newAddress, GrainAddress? existin return existing; } - private bool IsSiloDead(GrainAddress existing) => _owner.ClusterMembershipSnapshot.GetSiloStatus(existing.SiloAddress) == SiloStatus.Dead; + private bool IsSiloDead(GrainAddress existing) + => existing.SiloAddress is null || _owner.ClusterMembershipSnapshot.GetSiloStatus(existing.SiloAddress, existing.MembershipVersion) == SiloStatus.Dead; [LoggerMessage( Level = LogLevel.Trace, diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index aa2a9a57f0e..6e228cf583c 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -79,7 +79,7 @@ public LocalGrainDirectory( } DirectoryPartition = grainDirectoryPartitionFactory(); - HandoffManager = new GrainDirectoryHandoffManager(this, siloStatusOracle, grainFactory, grainDirectoryPartitionFactory, loggerFactory); + HandoffManager = new GrainDirectoryHandoffManager(this, siloStatusOracle, clusterMembershipService, grainFactory, loggerFactory); // When DistributedGrainDirectory is active, it registers its own IRemoteGrainDirectory system targets. // In that case, create the RemoteGrainDirectory objects (still needed for WorkItemGroup scheduling) @@ -433,8 +433,7 @@ internal static bool IsDefunctActivation(GrainAddress address, ClusterMembership return true; } - var status = snapshot.GetSiloStatus(silo); - return status == SiloStatus.Dead || (status == SiloStatus.None && address.MembershipVersion < snapshot.Version); + return snapshot.GetSiloStatus(silo, address.MembershipVersion) == SiloStatus.Dead; } internal SiloAddress? FindPredecessor(SiloAddress silo) @@ -624,7 +623,7 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres // this way next local lookup will find this ActivationAddress in the cache and we will save a full lookup! if (result.Address == null) return result; - if (!address.Equals(result.Address) || !IsValidSilo(address.SiloAddress)) return result; + if (!address.Equals(result.Address) || IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot)) return result; // update the cache so next local lookup will find this ActivationAddress in the cache and we will save full lookup. DirectoryCache.AddOrUpdate(result.Address, result.VersionTag); @@ -829,7 +828,15 @@ public bool LocalLookup(GrainId grain, out AddressAndTag result) public AddressAndTag GetLocalDirectoryData(GrainId grain) => DirectoryPartition.LookUpActivation(grain); - public GrainAddress? GetLocalCacheData(GrainId grain) => DirectoryCache.LookUp(grain, out var cache) && IsValidSilo(cache.SiloAddress) ? cache : null; + public GrainAddress? GetLocalCacheData(GrainId grain) + { + if (!DirectoryCache.LookUp(grain, out var cache)) + { + return null; + } + + return IsDefunctActivation(cache, clusterMembershipService.CurrentSnapshot) ? null : cache; + } public async Task LookupAsync(GrainId grainId, int hopCount = 0) { @@ -890,7 +897,7 @@ public async Task LookupAsync(GrainId grainId, int hopCount = 0) var result = await GetDirectoryReference(forwardAddress).LookupAsync(grainId, hopCount + 1); // update the cache - if (result.Address is { } address && IsValidSilo(address.SiloAddress)) + if (result.Address is { } address && !IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot)) { DirectoryCache.AddOrUpdate(address, result.VersionTag); } diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs index f27d5e15c86..90c6d752f10 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs @@ -112,21 +112,22 @@ internal sealed partial class LocalGrainDirectoryPartition private Dictionary partitionData; private readonly object lockable; private readonly ILogger log; - private readonly ISiloStatusOracle siloStatusOracle; + private readonly IClusterMembershipService clusterMembershipService; private readonly IOptions grainDirectoryOptions; internal int Count { get { return partitionData.Count; } } - public LocalGrainDirectoryPartition(ISiloStatusOracle siloStatusOracle, IOptions grainDirectoryOptions, ILoggerFactory loggerFactory) + public LocalGrainDirectoryPartition(IClusterMembershipService clusterMembershipService, IOptions grainDirectoryOptions, ILoggerFactory loggerFactory) { partitionData = new Dictionary(); lockable = new object(); log = loggerFactory.CreateLogger(); - this.siloStatusOracle = siloStatusOracle; + this.clusterMembershipService = clusterMembershipService; this.grainDirectoryOptions = grainDirectoryOptions; } - private bool IsValidSilo(SiloAddress? silo) => silo is not null && siloStatusOracle.IsFunctionalDirectory(silo); + private bool IsDefunctActivation(GrainAddress? address) + => address is null || LocalGrainDirectory.IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot); internal void Clear() { @@ -156,9 +157,11 @@ internal AddressAndTag AddSingleActivation(GrainAddress address, GrainAddress? p { LogTraceAddingSingleActivation(address.SiloAddress, address.GrainId, address.ActivationId); - if (!IsValidSilo(address.SiloAddress)) + if (IsDefunctActivation(address)) { - var siloStatus = this.siloStatusOracle.GetApproximateSiloStatus(address.SiloAddress); + var siloStatus = address.SiloAddress is { } siloAddress + ? this.clusterMembershipService.CurrentSnapshot.GetSiloStatus(siloAddress, address.MembershipVersion) + : SiloStatus.None; throw new OrleansException($"Trying to register {address.GrainId} on invalid silo: {address.SiloAddress}. Known status: {siloStatus}"); } @@ -170,10 +173,8 @@ internal AddressAndTag AddSingleActivation(GrainAddress address, GrainAddress? p } else { - var siloAddress = grainInfo.Activation?.SiloAddress; - // If there is an existing entry pointing to an invalid silo then remove it - if (siloAddress != null && !IsValidSilo(siloAddress)) + if (IsDefunctActivation(grainInfo.Activation)) { partitionData[address.GrainId] = grainInfo = new GrainInfo(); } @@ -230,7 +231,7 @@ internal AddressAndTag LookUpActivation(GrainId grain) result = new(grainInfo.Activation, grainInfo.VersionTag); } - if (!IsValidSilo(result.Address?.SiloAddress)) + if (IsDefunctActivation(result.Address)) { result = new(null, result.VersionTag); } @@ -308,7 +309,7 @@ internal List Split(Predicate predicate) for (var i = result.Count - 1; i >= 0; i--) { - if (!IsValidSilo(result[i].SiloAddress)) + if (IsDefunctActivation(result[i])) { result.RemoveAt(i); } diff --git a/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs b/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs index e69dfbd7145..1e12c444632 100644 --- a/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs +++ b/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs @@ -61,6 +61,18 @@ public SiloStatus GetSiloStatus(SiloAddress silo) return status; } + /// + /// Gets status of the specified silo, treating unknown silos as dead if this snapshot is newer than when the silo was seen. + /// + /// The silo. + /// The membership version when the silo was last seen. + /// The status of the specified silo. + public SiloStatus GetSiloStatus(SiloAddress silo, MembershipVersion seenAtVersion) + { + var status = GetSiloStatus(silo); + return status == SiloStatus.None && this.Version > seenAtVersion ? SiloStatus.Dead : status; + } + /// /// Returns a which represents this instance. /// diff --git a/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs b/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs index 2af9f4c3006..bc455ee8ab7 100644 --- a/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs +++ b/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs @@ -11,7 +11,7 @@ namespace Orleans.Runtime.MembershipService; /// /// Manages instances. /// -internal partial class SiloStatusListenerManager : ILifecycleParticipant +internal sealed partial class SiloStatusListenerManager : ILifecycleParticipant { #if NET9_0_OR_GREATER private readonly Lock _listenersLock = new(); diff --git a/src/api/Orleans.Runtime/Orleans.Runtime.cs b/src/api/Orleans.Runtime/Orleans.Runtime.cs index c3577927a1d..19168727c55 100644 --- a/src/api/Orleans.Runtime/Orleans.Runtime.cs +++ b/src/api/Orleans.Runtime/Orleans.Runtime.cs @@ -547,6 +547,8 @@ public ClusterMembershipSnapshot(System.Collections.Immutable.ImmutableDictionar public SiloStatus GetSiloStatus(SiloAddress silo) { throw null; } + public SiloStatus GetSiloStatus(SiloAddress silo, MembershipVersion seenAtVersion) { throw null; } + public override string ToString() { throw null; } } diff --git a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs index 066bbb1aa4d..c95b2bdc375 100644 --- a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs +++ b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs @@ -146,7 +146,7 @@ public async Task LocalGrainDirectoryStopDoesNotDisposeRegisteredCustomCache() .AddSingleton(cache) .BuildServiceProvider(); Factory partitionFactory = () => new LocalGrainDirectoryPartition( - siloStatusOracle, + membershipService.Target, Options.Create(new GrainDirectoryOptions()), this.loggerFactory); var systemTargetShared = new SystemTargetShared( @@ -388,6 +388,35 @@ public async Task LocalLookupWhenEntryExistsButSiloIsDead() await this.lifecycle.OnStop(); } + [Theory] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public async Task LocalLookupWhenCachedEntrySiloIsTerminatingButNotDead(SiloStatus status) + { + var silo = GenerateSiloAddress(); + + // Setup membership service + this.mockMembershipService.UpdateSiloStatus(silo, SiloStatus.Active, "silo"); + await this.lifecycle.OnStart(); + await WaitUntilClusterChangePropagated(); + + var address = GenerateGrainAddress(silo); + this.grainDirectory.Register(address, previousAddress: null).Returns(address); + + await this.grainLocator.Register(address, previousAddress: null); + Assert.True(this.grainLocator.TryLookupInCache(address.GrainId, out var cached)); + Assert.Equal(address, cached); + + this.mockMembershipService.UpdateSiloStatus(silo, status, "silo"); + await WaitUntilClusterChangePropagated(); + + Assert.True(this.grainLocator.TryLookupInCache(address.GrainId, out cached)); + Assert.Equal(address, cached); + await this.grainDirectory.DidNotReceive().UnregisterSilos(Arg.Any>()); + + await this.lifecycle.OnStop(); + } + /// /// Tests that the locator properly cleans up cached entries when a silo dies. /// This is critical for preventing requests from being sent to dead silos. diff --git a/test/Orleans.Runtime.Internal.Tests/ClusterMembershipSnapshotTests.cs b/test/Orleans.Runtime.Internal.Tests/ClusterMembershipSnapshotTests.cs new file mode 100644 index 00000000000..2cd4b65b070 --- /dev/null +++ b/test/Orleans.Runtime.Internal.Tests/ClusterMembershipSnapshotTests.cs @@ -0,0 +1,48 @@ +using System.Collections.Immutable; +using System.Net; +using Orleans.Runtime; +using Xunit; + +namespace UnitTests; + +[TestCategory("BVT"), TestCategory("Membership")] +public class ClusterMembershipSnapshotTests +{ + [Fact] + public void GetSiloStatus_ReturnsDeadForUnknownSiloSeenAtOlderVersion() + { + var unknownSilo = CreateSiloAddress(1); + var knownSilo = CreateSiloAddress(1, port: 11112); + var snapshot = CreateSnapshot(new ClusterMember(knownSilo, SiloStatus.Active, "known"), version: 2); + + Assert.Equal(SiloStatus.Dead, snapshot.GetSiloStatus(unknownSilo, new MembershipVersion(1))); + } + + [Theory] + [InlineData(2)] + [InlineData(3)] + public void GetSiloStatus_ReturnsNoneForUnknownSiloSeenAtCurrentOrNewerVersion(long seenAtVersion) + { + var unknownSilo = CreateSiloAddress(1); + var knownSilo = CreateSiloAddress(1, port: 11112); + var snapshot = CreateSnapshot(new ClusterMember(knownSilo, SiloStatus.Active, "known"), version: 2); + + Assert.Equal(SiloStatus.None, snapshot.GetSiloStatus(unknownSilo, new MembershipVersion(seenAtVersion))); + } + + [Fact] + public void GetSiloStatus_ReturnsDeadForSiloReplacedBySuccessor() + { + var silo = CreateSiloAddress(1); + var successor = CreateSiloAddress(2); + var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2); + + Assert.Equal(SiloStatus.Dead, snapshot.GetSiloStatus(silo, new MembershipVersion(2))); + } + + private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) + => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version)); + + private static SiloAddress CreateSiloAddress(int generation, int port = 11111) + => SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), generation); +} diff --git a/test/Orleans.Runtime.Internal.Tests/GrainDirectoryHandoffManagerTests.cs b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryHandoffManagerTests.cs new file mode 100644 index 00000000000..787179ab70b --- /dev/null +++ b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryHandoffManagerTests.cs @@ -0,0 +1,73 @@ +using System.Collections.Immutable; +using System.Net; +using Orleans.Runtime; +using Orleans.Runtime.GrainDirectory; +using Xunit; + +namespace UnitTests; + +[TestCategory("BVT"), TestCategory("GrainDirectory")] +public class GrainDirectoryHandoffManagerTests +{ + [Theory] + [InlineData(SiloStatus.Active, true)] + [InlineData(SiloStatus.ShuttingDown, true)] + [InlineData(SiloStatus.Stopping, true)] + [InlineData(SiloStatus.Dead, false)] + public void IsTransferableRegistration_UsesSnapshotStatus(SiloStatus status, bool expected) + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(silo, status, "silo"), version: 2); + + Assert.Equal(expected, GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + [Fact] + public void IsTransferableRegistration_AllowsUnknownSiloWithoutNewerMembershipVersion() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.True(GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + [Fact] + public void IsTransferableRegistration_RejectsUnknownSiloWithOlderMembershipVersion() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 1); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.False(GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + [Fact] + public void IsTransferableRegistration_RejectsSiloReplacedBySuccessor() + { + var silo = CreateSiloAddress(1); + var successor = CreateSiloAddress(2); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2); + + Assert.False(GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) + => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version)); + + private static GrainAddress CreateGrainAddress(SiloAddress siloAddress, long membershipVersion) + => new() + { + GrainId = GrainId.Create("test-grain", Guid.NewGuid().ToString("N")), + ActivationId = ActivationId.NewId(), + SiloAddress = siloAddress, + MembershipVersion = new MembershipVersion(membershipVersion) + }; + + private static SiloAddress CreateSiloAddress(int generation, int port = 11111) + => SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), generation); +} diff --git a/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs index 4f7582d2655..e9fe185227c 100644 --- a/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs +++ b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs @@ -31,15 +31,16 @@ namespace UnitTests; public class GrainDirectoryPartitionTests { private readonly LocalGrainDirectoryPartition _target; - private readonly MockSiloStatusOracle _siloStatusOracle; - private static readonly SiloAddress LocalSiloAddress = SiloAddress.FromParsableString("127.0.0.1:11111@123"); - private static readonly SiloAddress OtherSiloAddress = SiloAddress.FromParsableString("127.0.0.2:11111@456"); + private readonly MockClusterMembershipService _clusterMembershipService; + private static readonly SiloAddress LocalSiloAddress = SiloAddress.FromParsableString("127.0.0.1:11111@123"); + private static readonly SiloAddress OtherSiloAddress = SiloAddress.FromParsableString("127.0.0.2:11111@456"); public GrainDirectoryPartitionTests() { - _siloStatusOracle = new MockSiloStatusOracle(); + _clusterMembershipService = new MockClusterMembershipService(); + _clusterMembershipService.SetSiloStatus(LocalSiloAddress, SiloStatus.Active); _target = new LocalGrainDirectoryPartition( - _siloStatusOracle, + _clusterMembershipService, Options.Create(new GrainDirectoryOptions()), new LoggerFactory()); } @@ -53,7 +54,7 @@ public GrainDirectoryPartitionTests() [Fact] public void OverrideDeadEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var firstGrainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -62,7 +63,7 @@ public void OverrideDeadEntryTest() var firstRegister = _target.AddSingleActivation(firstGrainAddress, previousAddress: null); Assert.Equal(firstGrainAddress, firstRegister.Address); - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); // Previous entry is now pointing to a dead silo, it should be possible to override it now var secondGrainAddress = GrainAddress.NewActivationAddress(LocalSiloAddress, grainId); @@ -79,7 +80,7 @@ public void OverrideDeadEntryTest() [Fact] public void DoNotInsertInvalidEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -97,7 +98,7 @@ public void DoNotInsertInvalidEntryTest() [Fact] public void DoNotOverrideValidEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -121,7 +122,7 @@ public void DoNotOverrideValidEntryTest() [Fact] public void OverrideValidEntryIfMatchesTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -145,7 +146,7 @@ public void OverrideValidEntryIfMatchesTest() [Fact] public void DoNotOverrideValidEntryIfNoMatchTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -176,7 +177,7 @@ public void DoNotOverrideValidEntryIfNoMatchTest() [Fact] public void DoNotReturnInvalidEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress1 = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -185,65 +186,54 @@ public void DoNotReturnInvalidEntryTest() var register1 = _target.AddSingleActivation(grainAddress1, previousAddress: null); Assert.Equal(grainAddress1, register1.Address); - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); // Previous entry is no longer still valid, it should not be returned var lookup = _target.LookUpActivation(grainId); Assert.Null(lookup.Address); } - /// - /// Mock implementation of ISiloStatusOracle for testing. - /// The silo status oracle provides membership information about - /// which silos are alive, dead, or in other states. The grain - /// directory uses this to validate entries and make placement decisions. - /// - private class MockSiloStatusOracle : ISiloStatusOracle + [Theory] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public void ReturnEntryForTerminatingButNotDeadSilo(SiloStatus status) { - private readonly Dictionary _content = new(); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, status); - public MockSiloStatusOracle(SiloAddress siloAddress = null) - { - SiloAddress = siloAddress ?? LocalSiloAddress; - _content[SiloAddress] = SiloStatus.Active; - } + var grainId = GrainId.Create("testGrain", "myKey"); + var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); + + var register = _target.AddSingleActivation(grainAddress, previousAddress: null); + Assert.Equal(grainAddress, register.Address); + + var lookup = _target.LookUpActivation(grainId); + Assert.Equal(grainAddress, lookup.Address); + } - public SiloStatus CurrentStatus => SiloStatus.Active; + private sealed class MockClusterMembershipService : IClusterMembershipService + { + private readonly Dictionary _statuses = new(); + private long _version; - public string SiloName => "TestSilo"; + public ClusterMembershipSnapshot CurrentSnapshot { get; private set; } = + new(ImmutableDictionary.Empty, MembershipVersion.MinValue); - public SiloAddress SiloAddress { get; } + public IAsyncEnumerable MembershipUpdates => throw new NotImplementedException(); - public SiloStatus GetApproximateSiloStatus(SiloAddress siloAddress) + public void SetSiloStatus(SiloAddress siloAddress, SiloStatus status) { - if (_content.TryGetValue(siloAddress, out var status)) + _statuses[siloAddress] = (status, "TestSilo"); + var members = ImmutableDictionary.CreateBuilder(); + foreach (var (silo, entry) in _statuses) { - return status; + members[silo] = new ClusterMember(silo, entry.Status, entry.Name); } - return SiloStatus.None; - } - public Dictionary GetApproximateSiloStatuses(bool onlyActive = false) - { - return onlyActive - ? new Dictionary(_content.Where(kvp => kvp.Value == SiloStatus.Active)) - : new Dictionary(_content); + CurrentSnapshot = new ClusterMembershipSnapshot(members.ToImmutable(), new MembershipVersion(++_version)); } - public ImmutableArray GetActiveSilos() => _content.Keys.ToImmutableArray(); - - public void SetSiloStatus(SiloAddress siloAddress, SiloStatus status) => _content[siloAddress] = status; - - public bool IsDeadSilo(SiloAddress silo) => GetApproximateSiloStatus(silo) == SiloStatus.Dead; - - public bool IsFunctionalDirectory(SiloAddress siloAddress) => !GetApproximateSiloStatus(siloAddress).IsTerminating(); - - #region Not Implemented - public bool SubscribeToSiloStatusEvents(ISiloStatusListener observer) => throw new NotImplementedException(); - - public bool TryGetSiloName(SiloAddress siloAddress, out string siloName) => throw new NotImplementedException(); + public ValueTask Refresh(MembershipVersion minimumVersion = default, CancellationToken cancellationToken = default) => default; - public bool UnSubscribeFromSiloStatusEvents(ISiloStatusListener observer) => throw new NotImplementedException(); - #endregion + public Task TryKill(SiloAddress siloAddress) => throw new NotImplementedException(); } } From 3564900aa9fe82330a46a0727fa63209c5f12bec Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 21:03:53 -0700 Subject: [PATCH 13/15] Refine directory forwarding retry checks Only delay and recheck ownership after a request has already been forwarded once, matching the single-operation forwarding behavior. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/LocalGrainDirectory.cs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 6e228cf583c..a8365b7225d 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -738,8 +738,8 @@ public async Task UnregisterManyAsync(List addresses, Unregistrati UnregisterOrPutInForwardList(addresses, cause, hopCount, ref forwardlist, "UnregisterManyAsync"); - // before forwarding to other silos, we insert a retry delay and re-check destination - if (hopCount > 0 && forwardlist != null) + // After the first forward, we insert a retry delay and recheck owner before forwarding again + if (hopCount > 1 && forwardlist != null) { await Task.Delay(RETRY_DELAY); Dictionary>? forwardlist2 = null; @@ -852,8 +852,8 @@ public async Task LookupAsync(GrainId grainId, int hopCount = 0) // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(grainId, hopCount, "LookUpAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // After the first forward, we insert a retry delay and recheck owner before forwarding again + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(grainId, hopCount, "LookUpAsync"); @@ -913,8 +913,8 @@ public async Task DeleteGrainAsync(GrainId grainId, int hopCount) // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(grainId, hopCount, "DeleteGrainAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // After the first forward, we insert a retry delay and recheck owner before forwarding again + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(grainId, hopCount, "DeleteGrainAsync"); From 7cb3d77568ed54aced877e18e1a6ea411cf56a1e Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 21:27:33 -0700 Subject: [PATCH 14/15] Fix cached locator membership tracking Advance the previous membership snapshot after each cache cleanup pass and stamp manually cached entries with the current membership version so version-aware cache validation does not evict them immediately. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/CachedGrainLocator.cs | 7 ++- .../Directory/CachedGrainLocatorTests.cs | 45 +++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs index c11625dde19..32cd76dc558 100644 --- a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs +++ b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs @@ -187,6 +187,7 @@ private async Task ListenToClusterChange() } ((ITestAccessor)this).LastMembershipVersion = snapshot.Version; + previousSnapshot = snapshot; } } @@ -201,7 +202,11 @@ private bool IsKnownDeadSilo(SiloAddress siloAddress, MembershipVersion membersh private static void ThrowUnsupportedGrainType(GrainId grainId) => throw new InvalidOperationException($"Unsupported grain type for grain {grainId}"); - public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); + public void UpdateCache(GrainId grainId, SiloAddress siloAddress) + { + var membershipVersion = this.clusterMembershipService.CurrentSnapshot.Version; + cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress, MembershipVersion = membershipVersion }, (int)membershipVersion.Value); + } public void InvalidateCache(GrainId grainId) => cache.Remove(grainId); public void InvalidateCache(GrainAddress address) => cache.Remove(address); public bool TryLookupInCache(GrainId grainId, out GrainAddress address) diff --git a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs index c95b2bdc375..d15b93101c3 100644 --- a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs +++ b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs @@ -465,6 +465,51 @@ await this.grainDirectory await this.lifecycle.OnStop(); } + [Fact] + public async Task CleanupWhenSiloIsDeadOnlyProcessesIncrementalChanges() + { + var expectedSilo = GenerateSiloAddress(); + var outdatedSilo = GenerateSiloAddress(); + + this.mockMembershipService.UpdateSiloStatus(expectedSilo, SiloStatus.Active, "exp"); + this.mockMembershipService.UpdateSiloStatus(outdatedSilo, SiloStatus.Active, "old"); + await this.lifecycle.OnStart(); + await WaitUntilClusterChangePropagated(); + + this.mockMembershipService.UpdateSiloStatus(outdatedSilo, SiloStatus.Dead, "old"); + await WaitUntilClusterChangePropagated(); + + await this.grainDirectory + .Received(1) + .UnregisterSilos(Arg.Is>(list => list.Count == 1 && list.Contains(outdatedSilo))); + + this.mockMembershipService.UpdateSiloStatus(expectedSilo, SiloStatus.Active, "exp2"); + await WaitUntilClusterChangePropagated(); + + await this.grainDirectory + .Received(1) + .UnregisterSilos(Arg.Is>(list => list.Count == 1 && list.Contains(outdatedSilo))); + + await this.lifecycle.OnStop(); + } + + [Fact] + public async Task UpdateCacheStampsCurrentMembershipVersion() + { + await this.lifecycle.OnStart(); + + var grainId = GrainId.Create(GrainType.Create("test"), GrainIdKeyExtensions.CreateGuidKey(Guid.NewGuid())); + var silo = GenerateSiloAddress(); + + this.grainLocator.UpdateCache(grainId, silo); + + Assert.True(this.grainLocator.TryLookupInCache(grainId, out var cached)); + Assert.Equal(silo, cached.SiloAddress); + Assert.Equal(this.mockMembershipService.CurrentVersion, cached.MembershipVersion); + + await this.lifecycle.OnStop(); + } + [Fact] public async Task UnregisterCallDirectoryAndCleanCache() { From fa59d4886e46e60e9dd500a3f79eaf7d4db0b175 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 11 May 2026 21:28:08 -0700 Subject: [PATCH 15/15] Clarify directory forwarding retry comments Clarify that hopCount == 1 is allowed to re-forward immediately and the retry delay applies only after the request has already bounced through another owner. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../GrainDirectory/LocalGrainDirectory.cs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index a8365b7225d..8a83e8f49c1 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -585,7 +585,8 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync"); - // After the first forward, we insert a retry delay and recheck owner before forwarding again + // The first forwarded owner (hopCount == 1) forwards immediately if ownership changed again. + // Once the request has already been re-forwarded, pause and recheck before bouncing it again. if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); @@ -670,7 +671,8 @@ public async Task UnregisterAsync(GrainAddress address, UnregistrationCause caus // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync"); - // After the first forward, we insert a retry delay and recheck owner before forwarding again + // The first forwarded owner (hopCount == 1) forwards immediately if ownership changed again. + // Once the request has already been re-forwarded, pause and recheck before bouncing it again. if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); @@ -738,7 +740,8 @@ public async Task UnregisterManyAsync(List addresses, Unregistrati UnregisterOrPutInForwardList(addresses, cause, hopCount, ref forwardlist, "UnregisterManyAsync"); - // After the first forward, we insert a retry delay and recheck owner before forwarding again + // The first forwarded owner (hopCount == 1) forwards immediately if ownership changed again. + // Once the request has already been re-forwarded, pause and recheck before bouncing it again. if (hopCount > 1 && forwardlist != null) { await Task.Delay(RETRY_DELAY); @@ -852,7 +855,8 @@ public async Task LookupAsync(GrainId grainId, int hopCount = 0) // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(grainId, hopCount, "LookUpAsync"); - // After the first forward, we insert a retry delay and recheck owner before forwarding again + // The first forwarded owner (hopCount == 1) forwards immediately if ownership changed again. + // Once the request has already been re-forwarded, pause and recheck before bouncing it again. if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); @@ -913,7 +917,8 @@ public async Task DeleteGrainAsync(GrainId grainId, int hopCount) // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(grainId, hopCount, "DeleteGrainAsync"); - // After the first forward, we insert a retry delay and recheck owner before forwarding again + // The first forwarded owner (hopCount == 1) forwards immediately if ownership changed again. + // Once the request has already been re-forwarded, pause and recheck before bouncing it again. if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY);