diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs index 0eff3079ee0..dfca80b796d 100644 --- a/src/Orleans.Runtime/Catalog/Catalog.cs +++ b/src/Orleans.Runtime/Catalog/Catalog.cs @@ -11,9 +11,7 @@ namespace Orleans.Runtime { internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecycleParticipant { - private readonly SiloAddress _siloAddress; private readonly ActivationCollector activationCollector; - private readonly GrainDirectoryResolver grainDirectoryResolver; private readonly ActivationDirectory activations; private readonly IServiceProvider serviceProvider; private readonly ILogger logger; @@ -30,8 +28,6 @@ internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecyclePartic #endif public Catalog( - ILocalSiloDetails localSiloDetails, - GrainDirectoryResolver grainDirectoryResolver, ActivationDirectory activationDirectory, ActivationCollector activationCollector, IServiceProvider serviceProvider, @@ -40,8 +36,6 @@ public Catalog( SystemTargetShared shared) : base(Constants.CatalogType, shared) { - this._siloAddress = localSiloDetails.SiloAddress; - this.grainDirectoryResolver = grainDirectoryResolver; this.activations = activationDirectory; this.serviceProvider = serviceProvider; this.grainActivator = grainActivator; @@ -328,99 +322,12 @@ await Parallel.ForEachAsync(addresses, (activationAddress, cancellationToken) => }); } - // TODO move this logic in the LocalGrainDirectory - internal void OnSiloStatusChange(ILocalGrainDirectory directory, SiloAddress updatedSilo, SiloStatus status) - { - // ignore joining events and also events on myself. - if (updatedSilo.Equals(_siloAddress)) return; - - // We deactivate those activations when silo goes either of ShuttingDown/Stopping/Dead states, - // since this is what Directory is doing as well. Directory removes a silo based on all those 3 statuses, - // thus it will only deliver a "remove" notification for a given silo once to us. Therefore, we need to react the fist time we are notified. - // We may review the directory behavior in the future and treat ShuttingDown differently ("drain only") and then this code will have to change a well. - if (!status.IsTerminating()) return; - if (status == SiloStatus.Dead) - { - this.RuntimeClient.BreakOutstandingMessagesToSilo(updatedSilo); - } - - var activationsToShutdown = new List(); - try - { - // scan all activations in activation directory and deactivate the ones that the removed silo is their primary partition owner. - // Note: No lock needed here since ActivationDirectory uses ConcurrentDictionary which provides thread-safe enumeration - foreach (var activation in activations) - { - try - { - var activationData = activation.Value; - var placementStrategy = activationData.GetComponent(); - var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true }; - if (!isUsingGrainDirectory || !grainDirectoryResolver.IsUsingDefaultDirectory(activationData.GrainId.Type)) continue; - if (!updatedSilo.Equals(directory.GetPrimaryForGrain(activationData.GrainId))) continue; - - activationsToShutdown.Add(activationData); - } - catch (Exception exc) - { - LogErrorCatalogSiloStatusChangeNotification(new(updatedSilo), exc); - } - } - - if (activationsToShutdown.Count > 0) - { - LogInfoCatalogSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo)); - } - } - finally - { - // outside the lock. - if (activationsToShutdown.Count > 0) - { - var reasonText = $"This activation is being deactivated due to a failure of server {updatedSilo}, since it was responsible for this activation's grain directory registration."; - var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText); - StartDeactivatingActivations(reason, activationsToShutdown, CancellationToken.None); - } - } - - void StartDeactivatingActivations(DeactivationReason reason, List list, CancellationToken cancellationToken) - { - if (list == null || list.Count == 0) return; - - LogDebugDeactivateActivations(list.Count); - - foreach (var activation in list) - { - activation.Deactivate(reason, cancellationToken); - } - } - } - void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) { // Do nothing, just ensure that this instance is created so that it can register itself in the activation directory. _siloStatusOracle = serviceProvider.GetRequiredService(); } - private readonly struct SiloAddressLogValue(SiloAddress silo) - { - public override string ToString() => silo.ToStringWithHashCode(); - } - - [LoggerMessage( - Level = LogLevel.Error, - EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception, - Message = "Catalog has thrown an exception while handling removal of silo {Silo}" - )] - private partial void LogErrorCatalogSiloStatusChangeNotification(SiloAddressLogValue silo, Exception exception); - - [LoggerMessage( - Level = LogLevel.Information, - EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification, - Message = "Catalog is deactivating {Count} activations due to a failure of silo {Silo}, since it is a primary directory partition to these grain ids." - )] - private partial void LogInfoCatalogSiloStatusChangeNotification(int count, SiloAddressLogValue silo); - [LoggerMessage( Level = LogLevel.Trace, Message = "Unregistered activation {Activation}")] diff --git a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs index 8314c99c0ed..32cd76dc558 100644 --- a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs +++ b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs @@ -169,10 +169,10 @@ private async Task ListenToClusterChange() var updates = this.clusterMembershipService.MembershipUpdates.WithCancellation(this.shutdownToken.Token); await foreach (var snapshot in updates) { - // Active filtering: detect silos that went down and try to clean proactively the directory + // Active filtering: detect dead silos and try to clean proactively the directory var changes = snapshot.CreateUpdate(previousSnapshot).Changes; var deadSilos = changes - .Where(member => member.Status.IsTerminating()) + .Where(member => member.Status == SiloStatus.Dead) .Select(member => member.SiloAddress) .ToList(); @@ -187,6 +187,7 @@ private async Task ListenToClusterChange() } ((ITestAccessor)this).LastMembershipVersion = snapshot.Version; + previousSnapshot = snapshot; } } @@ -196,22 +197,16 @@ private bool IsKnownDeadSilo(GrainAddress grainAddress) private bool IsKnownDeadSilo(SiloAddress siloAddress, MembershipVersion membershipVersion) { var current = this.clusterMembershipService.CurrentSnapshot; - - // Check if the target silo is in the cluster - if (current.Members.TryGetValue(siloAddress, out var value)) - { - // It is, check if it's alive - return value.Status.IsTerminating(); - } - - // We didn't find it in the cluster. If the silo entry is too old, it has been cleaned in the membership table: the entry isn't valid anymore. - // Otherwise, maybe the membership service isn't up to date yet. The entry should be valid - return current.Version > membershipVersion; + return siloAddress is null || current.GetSiloStatus(siloAddress, membershipVersion) == SiloStatus.Dead; } private static void ThrowUnsupportedGrainType(GrainId grainId) => throw new InvalidOperationException($"Unsupported grain type for grain {grainId}"); - public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); + public void UpdateCache(GrainId grainId, SiloAddress siloAddress) + { + var membershipVersion = this.clusterMembershipService.CurrentSnapshot.Version; + cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress, MembershipVersion = membershipVersion }, (int)membershipVersion.Value); + } public void InvalidateCache(GrainId grainId) => cache.Remove(grainId); public void InvalidateCache(GrainAddress address) => cache.Remove(address); public bool TryLookupInCache(GrainId grainId, out GrainAddress address) @@ -222,10 +217,10 @@ public bool TryLookupInCache(GrainId grainId, out GrainAddress address) ThrowUnsupportedGrainType(grainId); } - if (this.cache.LookUp(grainId, out address, out var version)) + if (this.cache.LookUp(grainId, out address, out _)) { // If the silo is dead, remove the entry - if (IsKnownDeadSilo(address.SiloAddress, new MembershipVersion(version))) + if (IsKnownDeadSilo(address)) { address = default; this.cache.Remove(grainId); diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs index 2f8d58c5e41..9a79a54fba0 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs @@ -15,27 +15,26 @@ namespace Orleans.Runtime.GrainDirectory internal sealed partial class GrainDirectoryHandoffManager { private static readonly TimeSpan RetryDelay = TimeSpan.FromMilliseconds(250); - private const int MAX_OPERATION_DEQUEUE = 2; private readonly LocalGrainDirectory localDirectory; private readonly ISiloStatusOracle siloStatusOracle; + private readonly IClusterMembershipService clusterMembershipService; private readonly IInternalGrainFactory grainFactory; private readonly ILogger logger; - private readonly Factory createPartion; private readonly Queue<(string name, object state, Func action)> pendingOperations = new(); private readonly AsyncLock executorLock = new AsyncLock(); internal GrainDirectoryHandoffManager( LocalGrainDirectory localDirectory, ISiloStatusOracle siloStatusOracle, + IClusterMembershipService clusterMembershipService, IInternalGrainFactory grainFactory, - Factory createPartion, ILoggerFactory loggerFactory) { logger = loggerFactory.CreateLogger(); this.localDirectory = localDirectory; this.siloStatusOracle = siloStatusOracle; + this.clusterMembershipService = clusterMembershipService; this.grainFactory = grainFactory; - this.createPartion = createPartion; } internal void ProcessSiloAddEvent(SiloAddress addedSilo) @@ -71,6 +70,11 @@ internal void ProcessSiloAddEvent(SiloAddress addedSilo) private async Task ProcessAddedSiloAsync(SiloAddress addedSilo, List splitPartListSingle) { if (!this.localDirectory.Running) return; + if (!addedSilo.Equals(localDirectory.FindSuccessor(localDirectory.MyAddress))) + { + LogDebugNotImmediateSuccessor(logger, addedSilo); + return; + } if (this.siloStatusOracle.GetApproximateSiloStatus(addedSilo) == SiloStatus.Active) { @@ -124,6 +128,17 @@ private async Task AcceptExistingRegistrationsAsync(List singleAct { if (!this.localDirectory.Running) return; + var snapshot = this.clusterMembershipService.CurrentSnapshot; + for (var i = singleActivations.Count - 1; i >= 0; i--) + { + if (!IsTransferableRegistration(singleActivations[i], snapshot)) + { + singleActivations.RemoveAt(i); + } + } + + if (singleActivations.Count == 0) return; + LogDebugAcceptingRegistrations(logger, singleActivations.Count); var tasks = singleActivations.Select(addr => this.localDirectory.RegisterAsync(addr, previousAddress: null, 1)).ToArray(); @@ -185,6 +200,16 @@ private async Task DestroyDuplicateActivationsAsync(Dictionary action) { lock (this) @@ -201,7 +226,6 @@ private async Task ExecutePendingOperations() { using (await executorLock.LockAsync()) { - var dequeueCount = 0; while (true) { // Get the next operation, or exit if there are none. @@ -213,34 +237,23 @@ private async Task ExecutePendingOperations() op = this.pendingOperations.Peek(); } - dequeueCount++; - try { await op.Action(this, op.State); - // Success, reset the dequeue count - dequeueCount = 0; - } - catch (Exception exception) - { - if (dequeueCount < MAX_OPERATION_DEQUEUE) - { - LogWarningOperationFailedRetry(logger, exception, op.Name); - await Task.Delay(RetryDelay); - } - else + lock (this) { - LogWarningOperationFailedNoRetry(logger, exception, op.Name); + this.pendingOperations.Dequeue(); } } - if (dequeueCount == 0 || dequeueCount >= MAX_OPERATION_DEQUEUE) + catch (Exception exception) { - lock (this) + if (!this.localDirectory.Running) { - // Remove the operation from the queue if it was a success - // or if we tried too many times - this.pendingOperations.Dequeue(); + return; } + + LogWarningOperationFailedRetry(logger, exception, op.Name); + await Task.Delay(RetryDelay); } } } @@ -335,10 +348,5 @@ private readonly struct GrainAddressesLogValue(List grainAddresses )] private static partial void LogWarningOperationFailedRetry(ILogger logger, Exception exception, string operation); - [LoggerMessage( - Level = LogLevel.Warning, - Message = "{Operation} failed, will NOT be retried" - )] - private static partial void LogWarningOperationFailedNoRetry(ILogger logger, Exception exception, string operation); } } diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs index 4e008eadc98..7b6db82ce39 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs @@ -111,7 +111,8 @@ private GrainAddress RegisterCore(GrainAddress newAddress, GrainAddress? existin return existing; } - private bool IsSiloDead(GrainAddress existing) => _owner.ClusterMembershipSnapshot.GetSiloStatus(existing.SiloAddress) == SiloStatus.Dead; + private bool IsSiloDead(GrainAddress existing) + => existing.SiloAddress is null || _owner.ClusterMembershipSnapshot.GetSiloStatus(existing.SiloAddress, existing.MembershipVersion) == SiloStatus.Dead; [LoggerMessage( Level = LogLevel.Trace, diff --git a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs index 755f355ccb4..c5e0f07f44f 100644 --- a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs @@ -95,12 +95,5 @@ internal interface ILocalGrainDirectory : IDhtGrainDirectory /// Attempts to find the specified grain in the directory cache. /// bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address); - - /// - /// For determining message forwarding logic, we sometimes check if a silo is part of this cluster or not - /// - /// the address of the silo - /// true if the silo is known to be part of this cluster - bool IsSiloInCluster(SiloAddress silo); } } diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 7c97e15c82b..8a83e8f49c1 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -10,29 +10,31 @@ using Microsoft.Extensions.Options; using Orleans.Configuration; using Orleans.GrainDirectory; +using Orleans.Internal; using Orleans.Runtime.Scheduler; namespace Orleans.Runtime.GrainDirectory { - internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ISiloStatusListener, ILifecycleParticipant + internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ILifecycleParticipant { private readonly ILogger log; private readonly SiloAddress? seed; private readonly ISiloStatusOracle siloStatusOracle; + private readonly IClusterMembershipService clusterMembershipService; private readonly IInternalGrainFactory grainFactory; -#if NET9_0_OR_GREATER - private readonly Lock writeLock = new(); -#else - private readonly object writeLock = new(); -#endif + private readonly ActivationDirectory localActivations; private readonly IServiceProvider _serviceProvider; + private readonly CancellationTokenSource _membershipUpdatesCancellation = new(); private DirectoryMembership directoryMembership = DirectoryMembership.Default; + private ClusterMembershipSnapshot appliedClusterMembershipSnapshot = ClusterMembershipSnapshot.Default; + private GrainDirectoryResolver? grainDirectoryResolver; + private bool hasAppliedClusterMembershipSnapshot; // Consider: move these constants into an appropriate place internal const int HOP_LIMIT = 6; // forward a remote request no more than 5 times public static readonly TimeSpan RETRY_DELAY = TimeSpan.FromMilliseconds(200); // Pause 200ms between forwards to let the membership directory settle down internal bool Running; - private Catalog? _catalog; + private Task? membershipUpdatesTask; internal SiloAddress MyAddress { get; } @@ -51,6 +53,7 @@ public LocalGrainDirectory( IServiceProvider serviceProvider, ILocalSiloDetails siloDetails, ISiloStatusOracle siloStatusOracle, + IClusterMembershipService clusterMembershipService, IInternalGrainFactory grainFactory, Factory grainDirectoryPartitionFactory, IOptions developmentClusterMembershipOptions, @@ -63,7 +66,9 @@ public LocalGrainDirectory( MyAddress = siloDetails.SiloAddress; this.siloStatusOracle = siloStatusOracle; + this.clusterMembershipService = clusterMembershipService; this.grainFactory = grainFactory; + this.localActivations = systemTargetShared.ActivationDirectory; DirectoryCache = GrainDirectoryCacheFactory.CreateGrainDirectoryCache(serviceProvider, grainDirectoryOptions.Value, out this.disposeDirectoryCache); @@ -74,7 +79,7 @@ public LocalGrainDirectory( } DirectoryPartition = grainDirectoryPartitionFactory(); - HandoffManager = new GrainDirectoryHandoffManager(this, siloStatusOracle, grainFactory, grainDirectoryPartitionFactory, loggerFactory); + HandoffManager = new GrainDirectoryHandoffManager(this, siloStatusOracle, clusterMembershipService, grainFactory, loggerFactory); // When DistributedGrainDirectory is active, it registers its own IRemoteGrainDirectory system targets. // In that case, create the RemoteGrainDirectory objects (still needed for WorkItemGroup scheduling) @@ -96,16 +101,15 @@ public LocalGrainDirectory( DistributedGrainDirectoryPartitionCompatibilities = compatibilityPartitions.MoveToImmutable(); } - // add myself to the list of members - AddServer(MyAddress); + this.directoryMembership = DirectoryMembership.Create([MyAddress]); DirectoryInstruments.RegisterDirectoryPartitionSizeObserve(() => DirectoryPartition.Count); DirectoryInstruments.RegisterMyPortionRingDistanceObserve(() => RingDistanceToSuccessor()); - DirectoryInstruments.RegisterMyPortionRingPercentageObserve(() => (((float)this.RingDistanceToSuccessor()) / ((float)(int.MaxValue * 2L))) * 100); + DirectoryInstruments.RegisterMyPortionRingPercentageObserve(() => this.RingDistanceToSuccessor() / (float)(int.MaxValue * 2L) * 100); DirectoryInstruments.RegisterMyPortionAverageRingPercentageObserve(() => { var ring = this.directoryMembership.MembershipRingList; - return ring.Count == 0 ? 0 : ((float)100 / (float)ring.Count); + return ring.Count == 0 ? 0 : (100 / (float)ring.Count); }); DirectoryInstruments.RegisterRingSizeObserve(() => this.directoryMembership.MembershipRingList.Count); _serviceProvider = serviceProvider; @@ -116,8 +120,7 @@ public void Start() LogDebugStart(); Running = true; - - siloStatusOracle.SubscribeToSiloStatusEvents(this); + membershipUpdatesTask = Task.Run(() => ProcessMembershipUpdates(_membershipUpdatesCancellation.Token)); } // Note that this implementation stops processing directory change requests (Register, Unregister, etc.) when the Stop event is raised. @@ -135,139 +138,304 @@ public async Task StopAsync() //mark Running as false will exclude myself from CalculateGrainDirectoryPartition(grainId) Running = false; + _membershipUpdatesCancellation.Cancel(); + if (membershipUpdatesTask is { } task) + { + await task.SuppressThrowing(); + } if (this.disposeDirectoryCache) { - await GrainDirectoryCacheFactory.DisposeGrainDirectoryCacheAsync(DirectoryCache); + await GrainDirectoryCacheFactory.DisposeGrainDirectoryCacheAsync(DirectoryCache).SuppressThrowing(); } DirectoryPartition.Clear(); DirectoryCache.Clear(); } - private void AddServer(SiloAddress silo) + private async Task ProcessMembershipUpdates(CancellationToken cancellationToken) + { + while (!cancellationToken.IsCancellationRequested) + { + try + { + await ApplyMembershipSnapshot(); + + await foreach (var _ in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken)) + { + // Always apply the latest snapshot. + await ApplyMembershipSnapshot(); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (Exception exception) + { + LogErrorProcessingMembershipUpdates(exception); + + try + { + await clusterMembershipService.Refresh(cancellationToken: cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (Exception refreshException) + { + LogWarningRefreshingClusterMembershipFailed(refreshException); + } + + await Task.Delay(RETRY_DELAY, cancellationToken).SuppressThrowing(); + } + } + } + + private Task ApplyMembershipSnapshot() { - lock (this.writeLock) + return CacheValidator.RunOrQueueTask(() => { - var existing = this.directoryMembership; - if (existing.MembershipCache.Contains(silo)) + ApplyMembershipSnapshotCore(); + return Task.CompletedTask; + }); + + void ApplyMembershipSnapshotCore() + { + if (!Running) + { + return; + } + + var snapshot = clusterMembershipService.CurrentSnapshot; + var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default; + if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version) { - // we have already cached this silo return; } - // insert new silo in the sorted order - long hash = silo.GetConsistentHashCode(); + var previousMembership = CreateDirectoryMembership(previousSnapshot); + var targetMembership = CreateDirectoryMembership(snapshot); + this.directoryMembership = targetMembership; - // Find the last silo with hash smaller than the new silo, and insert the latter after (this is why we have +1 here) the former. - // Notice that FindLastIndex might return -1 if this should be the first silo in the list, but then - // 'index' will get 0, as needed. - int index = existing.MembershipRingList.FindLastIndex(siloAddr => siloAddr.GetConsistentHashCode() < hash) + 1; + var removedSilos = GetMembershipDifference(previousMembership, targetMembership); + var addedSilos = GetMembershipDifference(targetMembership, previousMembership); + + ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership); + AdjustLocalDirectory(snapshot); + AdjustLocalCache(snapshot, targetMembership); + + foreach (var silo in removedSilos) + { + LogDebugSiloRemovedSilo(MyAddress, silo); + } - this.directoryMembership = new DirectoryMembership( - existing.MembershipRingList.Insert(index, silo), - existing.MembershipCache.Add(silo)); + foreach (var silo in addedSilos) + { + HandoffManager.ProcessSiloAddEvent(silo); + LogDebugSiloAddedSilo(MyAddress, silo); + } - HandoffManager.ProcessSiloAddEvent(silo); + appliedClusterMembershipSnapshot = snapshot; + hasAppliedClusterMembershipSnapshot = true; + } + } - AdjustLocalDirectory(silo, dead: false); - AdjustLocalCache(silo, dead: false); + private void ProcessSiloStatusChanges( + ClusterMembershipSnapshot snapshot, + ClusterMembershipSnapshot previousSnapshot, + DirectoryMembership previousMembership) + { + var changes = previousSnapshot.Version != MembershipVersion.MinValue + ? snapshot.CreateUpdate(previousSnapshot) + : snapshot.AsUpdate(); + var statusChanges = new List(); + foreach (var change in changes.Changes) + { + if (!change.SiloAddress.Equals(MyAddress) && change.Status.IsTerminating()) + { + statusChanges.Add(change); + } + } - LogDebugSiloAddedSilo(MyAddress, silo); + statusChanges.Sort(static (left, right) => CompareSiloAddress(left.SiloAddress, right.SiloAddress)); + foreach (var change in statusChanges) + { + OnSiloStatusChange(previousMembership, change.SiloAddress, change.Status); } } - private void RemoveServer(SiloAddress silo, SiloStatus status) + private DirectoryMembership CreateDirectoryMembership(ClusterMembershipSnapshot snapshot) { - lock (this.writeLock) + var members = new List(); + foreach (var member in snapshot.Members) { - try + if (member.Value.Status == SiloStatus.Active) { - // Only notify the catalog once. Order is important: call BEFORE updating membershipRingList. - _catalog = _serviceProvider.GetRequiredService(); - _catalog.OnSiloStatusChange(this, silo, status); + members.Add(member.Key); } - catch (Exception exc) + } + + if (Running && !members.Contains(MyAddress)) + { + members.Add(MyAddress); + } + + return DirectoryMembership.Create(members); + } + + private List GetMembershipDifference( + DirectoryMembership currentMembership, + DirectoryMembership otherMembership) + { + var result = new List(); + foreach (var silo in currentMembership.MembershipRingList) + { + if (!silo.Equals(MyAddress) && !otherMembership.MembershipCache.Contains(silo)) { - LogErrorCatalogSiloStatusChangeNotificationException(exc, new(silo)); + result.Add(silo); } + } - var existing = this.directoryMembership; - if (!existing.MembershipCache.Contains(silo)) + return result; + } + + private Task RefreshMembershipIfNewer(List addresses) + { + var targetVersion = MembershipVersion.MinValue; + foreach (var address in addresses) + { + if (address.MembershipVersion > targetVersion) { - // we have already removed this silo - return; + targetVersion = address.MembershipVersion; } + } - this.directoryMembership = new DirectoryMembership( - existing.MembershipRingList.Remove(silo), - existing.MembershipCache.Remove(silo)); + return RefreshMembershipIfNewer(targetVersion); + } - AdjustLocalDirectory(silo, dead: true); - AdjustLocalCache(silo, dead: true); + private async Task RefreshMembershipIfNewer(MembershipVersion targetVersion) + { + if (targetVersion <= appliedClusterMembershipSnapshot.Version) + { + return; + } - LogDebugSiloRemovedSilo(MyAddress, silo); + if (targetVersion > clusterMembershipService.CurrentSnapshot.Version) + { + await clusterMembershipService.Refresh(targetVersion); } + + await ApplyMembershipSnapshot(); } - /// - /// Adjust local directory following the addition/removal of a silo - /// - private void AdjustLocalDirectory(SiloAddress silo, bool dead) + private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddress updatedSilo, SiloStatus status) + { + if (updatedSilo.Equals(MyAddress) || !status.IsTerminating()) + { + return; + } + + var activationsToShutdown = new List(); + var resolver = grainDirectoryResolver ??= _serviceProvider.GetRequiredService(); + foreach (var activation in localActivations) + { + try + { + var activationData = activation.Value; + var placementStrategy = activationData.GetComponent(); + var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true }; + if (!isUsingGrainDirectory || !resolver.IsUsingDefaultDirectory(activationData.GrainId.Type)) + { + continue; + } + + if (!updatedSilo.Equals(CalculateGrainDirectoryPartition(activationData.GrainId, previousMembership))) + { + continue; + } + + activationsToShutdown.Add(activationData); + } + catch (Exception exception) + { + LogErrorSiloStatusChangeNotification(new(updatedSilo), exception); + } + } + + if (activationsToShutdown.Count == 0) + { + return; + } + + LogInfoSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo), status); + + var reasonText = $"This activation is being deactivated because server {updatedSilo} entered status {status} and was responsible for this activation's grain directory registration."; + var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText); + foreach (var activation in activationsToShutdown) + { + try + { + activation.Deactivate(reason, CancellationToken.None); + } + catch (Exception exception) + { + LogErrorDeactivatingActivationForRemovedSilo(exception, activation.GrainId, new(updatedSilo)); + } + } + } + + private void AdjustLocalDirectory(ClusterMembershipSnapshot snapshot) { - // Determine which activations to remove. var activationsToRemove = new List<(GrainId, ActivationId)>(); foreach (var entry in this.DirectoryPartition.GetItems()) { if (entry.Value.Activation is { } address) { - // Include any activations from dead silos and from predecessors. - if (dead && address.SiloAddress!.Equals(silo) || address.SiloAddress!.IsPredecessorOf(silo)) + if (IsDefunctActivation(address, snapshot)) { activationsToRemove.Add((entry.Key, address.ActivationId)); } } } - // Remove all defunct activations. foreach (var activation in activationsToRemove) { DirectoryPartition.RemoveActivation(activation.Item1, activation.Item2); } } - /// Adjust local cache following the removal of a silo by dropping: - /// 1) entries that point to activations located on the removed silo - /// 2) entries for grains that are now owned by this silo (me) - /// 3) entries for grains that were owned by this removed silo - we currently do NOT do that. - /// If we did 3, we need to do that BEFORE we change the membershipRingList (based on old Membership). - /// We don't do that since first cache refresh handles that. - /// Second, since Membership events are not guaranteed to be ordered, we may remove a cache entry that does not really point to a failed silo. - /// To do that properly, we need to store for each cache entry who was the directory owner that registered this activation (the original partition owner). - private void AdjustLocalCache(SiloAddress silo, bool dead) + private void AdjustLocalCache(ClusterMembershipSnapshot snapshot, DirectoryMembership targetMembership) { - // remove all records of activations located on the removed silo foreach (var tuple in DirectoryCache.KeyValues) { var activationAddress = tuple.ActivationAddress; - // 2) remove entries now owned by me (they should be retrieved from my directory partition) - if (MyAddress.Equals(CalculateGrainDirectoryPartition(activationAddress.GrainId))) + // Remove entries now owned by me. They should be retrieved from my directory partition. + if (MyAddress.Equals(CalculateGrainDirectoryPartition(activationAddress.GrainId, targetMembership))) { DirectoryCache.Remove(activationAddress.GrainId); continue; } - // 1) remove entries that point to activations located on the removed silo - // For dead silos, remove any activation registered to that silo or one of its predecessors. - // For new silos, remove any activation registered to one of its predecessors. - if (activationAddress.SiloAddress!.IsPredecessorOf(silo) || dead && activationAddress.SiloAddress.Equals(silo)) + if (IsDefunctActivation(activationAddress, snapshot)) { DirectoryCache.Remove(activationAddress.GrainId); } } } + internal static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot) + { + if (address.SiloAddress is not { } silo) + { + return true; + } + + return snapshot.GetSiloStatus(silo, address.MembershipVersion) == SiloStatus.Dead; + } + internal SiloAddress? FindPredecessor(SiloAddress silo) { var existing = directoryMembership.MembershipRingList; @@ -294,24 +462,6 @@ private void AdjustLocalCache(SiloAddress silo, bool dead) return existing.Count > 1 ? existing[(index + 1) % existing.Count] : null; } - public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus status) - { - // This silo's status has changed - if (!Equals(updatedSilo, MyAddress)) // Status change for some other silo - { - if (status.IsTerminating()) - { - // QueueAction up the "Remove" to run on a system turn - CacheValidator.WorkItemGroup.QueueAction(() => RemoveServer(updatedSilo, status)); - } - else if (status == SiloStatus.Active) // do not do anything with SiloStatus.Starting -- wait until it actually becomes active - { - // QueueAction up the "Remove" to run on a system turn - CacheValidator.WorkItemGroup.QueueAction(() => AddServer(updatedSilo)); - } - } - } - private bool IsValidSilo(SiloAddress? silo) => siloStatusOracle.IsFunctionalDirectory(silo); /// @@ -321,6 +471,9 @@ public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus sta /// /// public SiloAddress? CalculateGrainDirectoryPartition(GrainId grainId) + => CalculateGrainDirectoryPartition(grainId, this.directoryMembership); + + private SiloAddress? CalculateGrainDirectoryPartition(GrainId grainId, DirectoryMembership membership) { // give a special treatment for special grains if (grainId.IsSystemTarget()) @@ -352,7 +505,7 @@ public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus sta // is doing something valuable. bool excludeMySelf = !Running; - var existing = this.directoryMembership; + var existing = membership; if (existing.MembershipRingList.Count == 0) { // If the membership ring is empty, then we're the owner by default unless we're stopping. @@ -427,11 +580,14 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres DirectoryInstruments.RegistrationsSingleActIssued.Add(1); } + await RefreshMembershipIfNewer(address.MembershipVersion); + // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // The first forwarded owner (hopCount == 1) forwards immediately if ownership changed again. + // Once the request has already been re-forwarded, pause and recheck before bouncing it again. + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync"); @@ -468,7 +624,7 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres // this way next local lookup will find this ActivationAddress in the cache and we will save a full lookup! if (result.Address == null) return result; - if (!address.Equals(result.Address) || !IsValidSilo(address.SiloAddress)) return result; + if (!address.Equals(result.Address) || IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot)) return result; // update the cache so next local lookup will find this ActivationAddress in the cache and we will save full lookup. DirectoryCache.AddOrUpdate(result.Address, result.VersionTag); @@ -477,20 +633,22 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres } } - public Task UnregisterAfterNonexistingActivation(GrainAddress addr, SiloAddress origin) + public async Task UnregisterAfterNonexistingActivation(GrainAddress addr, SiloAddress origin) { LogTraceUnregisterAfterNonexistingActivation(addr, origin); + await RefreshMembershipIfNewer(addr.MembershipVersion); + if (origin == null || this.directoryMembership.MembershipCache.Contains(origin)) { // the request originated in this cluster, call unregister here - return UnregisterAsync(addr, UnregistrationCause.NonexistentActivation, 0); + await UnregisterAsync(addr, UnregistrationCause.NonexistentActivation, 0); } else { // the request originated in another cluster, call unregister there var remoteDirectory = GetDirectoryReference(origin); - return remoteDirectory.UnregisterAsync(addr, UnregistrationCause.NonexistentActivation); + await remoteDirectory.UnregisterAsync(addr, UnregistrationCause.NonexistentActivation); } } @@ -508,11 +666,14 @@ public async Task UnregisterAsync(GrainAddress address, UnregistrationCause caus if (hopCount == 0) InvalidateCacheEntry(address); + await RefreshMembershipIfNewer(address.MembershipVersion); + // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // The first forwarded owner (hopCount == 1) forwards immediately if ownership changed again. + // Once the request has already been re-forwarded, pause and recheck before bouncing it again. + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync"); @@ -562,7 +723,6 @@ private void UnregisterOrPutInForwardList(List addresses, Unregist } } - public async Task UnregisterManyAsync(List addresses, UnregistrationCause cause, int hopCount) { if (hopCount > 0) @@ -574,12 +734,15 @@ public async Task UnregisterManyAsync(List addresses, Unregistrati DirectoryInstruments.UnregistrationsManyIssued.Add(1); } + await RefreshMembershipIfNewer(addresses); + Dictionary>? forwardlist = null; UnregisterOrPutInForwardList(addresses, cause, hopCount, ref forwardlist, "UnregisterManyAsync"); - // before forwarding to other silos, we insert a retry delay and re-check destination - if (hopCount > 0 && forwardlist != null) + // The first forwarded owner (hopCount == 1) forwards immediately if ownership changed again. + // Once the request has already been re-forwarded, pause and recheck before bouncing it again. + if (hopCount > 1 && forwardlist != null) { await Task.Delay(RETRY_DELAY); Dictionary>? forwardlist2 = null; @@ -668,7 +831,15 @@ public bool LocalLookup(GrainId grain, out AddressAndTag result) public AddressAndTag GetLocalDirectoryData(GrainId grain) => DirectoryPartition.LookUpActivation(grain); - public GrainAddress? GetLocalCacheData(GrainId grain) => DirectoryCache.LookUp(grain, out var cache) && IsValidSilo(cache.SiloAddress) ? cache : null; + public GrainAddress? GetLocalCacheData(GrainId grain) + { + if (!DirectoryCache.LookUp(grain, out var cache)) + { + return null; + } + + return IsDefunctActivation(cache, clusterMembershipService.CurrentSnapshot) ? null : cache; + } public async Task LookupAsync(GrainId grainId, int hopCount = 0) { @@ -684,8 +855,9 @@ public async Task LookupAsync(GrainId grainId, int hopCount = 0) // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(grainId, hopCount, "LookUpAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // The first forwarded owner (hopCount == 1) forwards immediately if ownership changed again. + // Once the request has already been re-forwarded, pause and recheck before bouncing it again. + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(grainId, hopCount, "LookUpAsync"); @@ -729,7 +901,7 @@ public async Task LookupAsync(GrainId grainId, int hopCount = 0) var result = await GetDirectoryReference(forwardAddress).LookupAsync(grainId, hopCount + 1); // update the cache - if (result.Address is { } address && IsValidSilo(address.SiloAddress)) + if (result.Address is { } address && !IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot)) { DirectoryCache.AddOrUpdate(address, result.VersionTag); } @@ -745,8 +917,9 @@ public async Task DeleteGrainAsync(GrainId grainId, int hopCount) // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(grainId, hopCount, "DeleteGrainAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // The first forwarded owner (hopCount == 1) forwards immediately if ownership changed again. + // Once the request has already been re-forwarded, pause and recheck before bouncing it again. + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(grainId, hopCount, "DeleteGrainAsync"); @@ -812,9 +985,10 @@ private bool IsSiloNextInTheRing(SiloAddress siloAddr, int hash, bool excludeMyS return siloAddr.GetConsistentHashCode() <= hash && (!excludeMySelf || !siloAddr.Equals(MyAddress)); } - public bool IsSiloInCluster(SiloAddress silo) + private static int CompareSiloAddress(SiloAddress left, SiloAddress right) { - return this.directoryMembership.MembershipCache.Contains(silo); + var hashComparison = left.GetConsistentHashCode().CompareTo(right.GetConsistentHashCode()); + return hashComparison != 0 ? hashComparison : left.CompareTo(right); } public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) => this.DirectoryCache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); @@ -852,11 +1026,37 @@ private readonly struct SiloHashLogValue(SiloAddress? silo) private partial void LogDebugSiloAddedSilo(SiloAddress siloAddress, SiloAddress otherSiloAddress); [LoggerMessage( - EventId = (int)ErrorCode.Directory_SiloStatusChangeNotification_Exception, Level = LogLevel.Error, - Message = "CatalogSiloStatusListener.SiloStatusChangeNotification has thrown an exception when notified about removed silo {Silo}." + EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception, + Message = "LocalGrainDirectory has thrown an exception while handling removal of silo {Silo}." )] - private partial void LogErrorCatalogSiloStatusChangeNotificationException(Exception exception, SiloAddressLogValue silo); + private partial void LogErrorSiloStatusChangeNotification(SiloAddressLogValue silo, Exception exception); + + [LoggerMessage( + Level = LogLevel.Information, + EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification, + Message = "LocalGrainDirectory is deactivating {Count} activations because silo {Silo} entered status {Status} and was the primary directory partition for these grain ids." + )] + private partial void LogInfoSiloStatusChangeNotification(int count, SiloAddressLogValue silo, SiloStatus status); + + [LoggerMessage( + Level = LogLevel.Error, + EventId = (int)ErrorCode.Catalog_DeactivateActivation_Exception, + Message = "LocalGrainDirectory has thrown an exception while deactivating activation {GrainId} due to removal of silo {Silo}." + )] + private partial void LogErrorDeactivatingActivationForRemovedSilo(Exception exception, GrainId grainId, SiloAddressLogValue silo); + + [LoggerMessage( + Level = LogLevel.Error, + Message = "Error processing cluster membership updates." + )] + private partial void LogErrorProcessingMembershipUpdates(Exception exception); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Failed to refresh cluster membership after a directory membership update processing error." + )] + private partial void LogWarningRefreshingClusterMembershipFailed(Exception exception); [LoggerMessage( Level = LogLevel.Debug, @@ -974,19 +1174,21 @@ private readonly struct SiloHashLogValue(SiloAddress? silo) )] private partial void LogWarningDeleteGrainAsyncNotOwner(GrainId grainId, SiloAddress? forwardAddress, int hopCount); - private class DirectoryMembership + private class DirectoryMembership(ImmutableList membershipRingList, ImmutableHashSet membershipCache) { - public DirectoryMembership(ImmutableList membershipRingList, ImmutableHashSet membershipCache) + public static DirectoryMembership Default { get; } = new DirectoryMembership([], []); + + public static DirectoryMembership Create(IEnumerable members) { - this.MembershipRingList = membershipRingList; - this.MembershipCache = membershipCache; + var builder = ImmutableList.CreateBuilder(); + builder.AddRange(members); + builder.Sort(CompareSiloAddress); + var ring = builder.ToImmutable(); + return new DirectoryMembership(ring, [.. ring]); } - public static DirectoryMembership Default { get; } = new DirectoryMembership(ImmutableList.Empty, ImmutableHashSet.Empty); - - - public ImmutableList MembershipRingList { get; } - public ImmutableHashSet MembershipCache { get; } + public ImmutableList MembershipRingList { get; } = membershipRingList; + public ImmutableHashSet MembershipCache { get; } = membershipCache; } } } diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs index f27d5e15c86..90c6d752f10 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs @@ -112,21 +112,22 @@ internal sealed partial class LocalGrainDirectoryPartition private Dictionary partitionData; private readonly object lockable; private readonly ILogger log; - private readonly ISiloStatusOracle siloStatusOracle; + private readonly IClusterMembershipService clusterMembershipService; private readonly IOptions grainDirectoryOptions; internal int Count { get { return partitionData.Count; } } - public LocalGrainDirectoryPartition(ISiloStatusOracle siloStatusOracle, IOptions grainDirectoryOptions, ILoggerFactory loggerFactory) + public LocalGrainDirectoryPartition(IClusterMembershipService clusterMembershipService, IOptions grainDirectoryOptions, ILoggerFactory loggerFactory) { partitionData = new Dictionary(); lockable = new object(); log = loggerFactory.CreateLogger(); - this.siloStatusOracle = siloStatusOracle; + this.clusterMembershipService = clusterMembershipService; this.grainDirectoryOptions = grainDirectoryOptions; } - private bool IsValidSilo(SiloAddress? silo) => silo is not null && siloStatusOracle.IsFunctionalDirectory(silo); + private bool IsDefunctActivation(GrainAddress? address) + => address is null || LocalGrainDirectory.IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot); internal void Clear() { @@ -156,9 +157,11 @@ internal AddressAndTag AddSingleActivation(GrainAddress address, GrainAddress? p { LogTraceAddingSingleActivation(address.SiloAddress, address.GrainId, address.ActivationId); - if (!IsValidSilo(address.SiloAddress)) + if (IsDefunctActivation(address)) { - var siloStatus = this.siloStatusOracle.GetApproximateSiloStatus(address.SiloAddress); + var siloStatus = address.SiloAddress is { } siloAddress + ? this.clusterMembershipService.CurrentSnapshot.GetSiloStatus(siloAddress, address.MembershipVersion) + : SiloStatus.None; throw new OrleansException($"Trying to register {address.GrainId} on invalid silo: {address.SiloAddress}. Known status: {siloStatus}"); } @@ -170,10 +173,8 @@ internal AddressAndTag AddSingleActivation(GrainAddress address, GrainAddress? p } else { - var siloAddress = grainInfo.Activation?.SiloAddress; - // If there is an existing entry pointing to an invalid silo then remove it - if (siloAddress != null && !IsValidSilo(siloAddress)) + if (IsDefunctActivation(grainInfo.Activation)) { partitionData[address.GrainId] = grainInfo = new GrainInfo(); } @@ -230,7 +231,7 @@ internal AddressAndTag LookUpActivation(GrainId grain) result = new(grainInfo.Activation, grainInfo.VersionTag); } - if (!IsValidSilo(result.Address?.SiloAddress)) + if (IsDefunctActivation(result.Address)) { result = new(null, result.VersionTag); } @@ -308,7 +309,7 @@ internal List Split(Predicate predicate) for (var i = result.Count - 1; i >= 0; i--) { - if (!IsValidSilo(result[i].SiloAddress)) + if (IsDefunctActivation(result[i])) { result.RemoveAt(i); } diff --git a/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs b/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs index e69dfbd7145..1e12c444632 100644 --- a/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs +++ b/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs @@ -61,6 +61,18 @@ public SiloStatus GetSiloStatus(SiloAddress silo) return status; } + /// + /// Gets status of the specified silo, treating unknown silos as dead if this snapshot is newer than when the silo was seen. + /// + /// The silo. + /// The membership version when the silo was last seen. + /// The status of the specified silo. + public SiloStatus GetSiloStatus(SiloAddress silo, MembershipVersion seenAtVersion) + { + var status = GetSiloStatus(silo); + return status == SiloStatus.None && this.Version > seenAtVersion ? SiloStatus.Dead : status; + } + /// /// Returns a which represents this instance. /// diff --git a/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs b/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs index 2af9f4c3006..bc455ee8ab7 100644 --- a/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs +++ b/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs @@ -11,7 +11,7 @@ namespace Orleans.Runtime.MembershipService; /// /// Manages instances. /// -internal partial class SiloStatusListenerManager : ILifecycleParticipant +internal sealed partial class SiloStatusListenerManager : ILifecycleParticipant { #if NET9_0_OR_GREATER private readonly Lock _listenersLock = new(); diff --git a/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs b/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs index c276aaf0adf..4eb336b1415 100644 --- a/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs +++ b/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs @@ -9,15 +9,18 @@ internal partial class SiloConnectionMaintainer : ILifecycleParticipant log; public SiloConnectionMaintainer( ConnectionManager connectionManager, ISiloStatusOracle siloStatusOracle, + IRuntimeClient runtimeClient, ILogger log) { this.connectionManager = connectionManager; this.siloStatusOracle = siloStatusOracle; + this.runtimeClient = runtimeClient; this.log = log; } @@ -42,6 +45,7 @@ public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus sta { if (status == SiloStatus.Dead && updatedSilo != siloStatusOracle.SiloAddress) { + this.runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo); _ = Task.Run(() => this.CloseConnectionAsync(updatedSilo)); } } @@ -68,4 +72,4 @@ private async Task CloseConnectionAsync(SiloAddress silo) )] private static partial void LogExceptionWhileClosingConnections(ILogger logger, SiloAddress siloAddress, Exception exception); } -} \ No newline at end of file +} diff --git a/src/api/Orleans.Runtime/Orleans.Runtime.cs b/src/api/Orleans.Runtime/Orleans.Runtime.cs index c3577927a1d..19168727c55 100644 --- a/src/api/Orleans.Runtime/Orleans.Runtime.cs +++ b/src/api/Orleans.Runtime/Orleans.Runtime.cs @@ -547,6 +547,8 @@ public ClusterMembershipSnapshot(System.Collections.Immutable.ImmutableDictionar public SiloStatus GetSiloStatus(SiloAddress silo) { throw null; } + public SiloStatus GetSiloStatus(SiloAddress silo, MembershipVersion seenAtVersion) { throw null; } + public override string ToString() { throw null; } } diff --git a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs index db66ed56d2c..d15b93101c3 100644 --- a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs +++ b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs @@ -140,12 +140,13 @@ public async Task LocalGrainDirectoryStopDoesNotDisposeRegisteredCustomCache() localSiloDetails.Name.Returns("TestSilo"); localSiloDetails.ClusterId.Returns("TestCluster"); var siloStatusOracle = Substitute.For(); + var membershipService = new MockClusterMembershipService(); var grainFactory = Substitute.For(); var services = new ServiceCollection() .AddSingleton(cache) .BuildServiceProvider(); Factory partitionFactory = () => new LocalGrainDirectoryPartition( - siloStatusOracle, + membershipService.Target, Options.Create(new GrainDirectoryOptions()), this.loggerFactory); var systemTargetShared = new SystemTargetShared( @@ -160,6 +161,7 @@ public async Task LocalGrainDirectoryStopDoesNotDisposeRegisteredCustomCache() serviceProvider: services, siloDetails: localSiloDetails, siloStatusOracle: siloStatusOracle, + clusterMembershipService: membershipService.Target, grainFactory: grainFactory, grainDirectoryPartitionFactory: partitionFactory, developmentClusterMembershipOptions: Options.Create(new DevelopmentClusterMembershipOptions()), @@ -386,6 +388,35 @@ public async Task LocalLookupWhenEntryExistsButSiloIsDead() await this.lifecycle.OnStop(); } + [Theory] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public async Task LocalLookupWhenCachedEntrySiloIsTerminatingButNotDead(SiloStatus status) + { + var silo = GenerateSiloAddress(); + + // Setup membership service + this.mockMembershipService.UpdateSiloStatus(silo, SiloStatus.Active, "silo"); + await this.lifecycle.OnStart(); + await WaitUntilClusterChangePropagated(); + + var address = GenerateGrainAddress(silo); + this.grainDirectory.Register(address, previousAddress: null).Returns(address); + + await this.grainLocator.Register(address, previousAddress: null); + Assert.True(this.grainLocator.TryLookupInCache(address.GrainId, out var cached)); + Assert.Equal(address, cached); + + this.mockMembershipService.UpdateSiloStatus(silo, status, "silo"); + await WaitUntilClusterChangePropagated(); + + Assert.True(this.grainLocator.TryLookupInCache(address.GrainId, out cached)); + Assert.Equal(address, cached); + await this.grainDirectory.DidNotReceive().UnregisterSilos(Arg.Any>()); + + await this.lifecycle.OnStop(); + } + /// /// Tests that the locator properly cleans up cached entries when a silo dies. /// This is critical for preventing requests from being sent to dead silos. @@ -434,6 +465,51 @@ await this.grainDirectory await this.lifecycle.OnStop(); } + [Fact] + public async Task CleanupWhenSiloIsDeadOnlyProcessesIncrementalChanges() + { + var expectedSilo = GenerateSiloAddress(); + var outdatedSilo = GenerateSiloAddress(); + + this.mockMembershipService.UpdateSiloStatus(expectedSilo, SiloStatus.Active, "exp"); + this.mockMembershipService.UpdateSiloStatus(outdatedSilo, SiloStatus.Active, "old"); + await this.lifecycle.OnStart(); + await WaitUntilClusterChangePropagated(); + + this.mockMembershipService.UpdateSiloStatus(outdatedSilo, SiloStatus.Dead, "old"); + await WaitUntilClusterChangePropagated(); + + await this.grainDirectory + .Received(1) + .UnregisterSilos(Arg.Is>(list => list.Count == 1 && list.Contains(outdatedSilo))); + + this.mockMembershipService.UpdateSiloStatus(expectedSilo, SiloStatus.Active, "exp2"); + await WaitUntilClusterChangePropagated(); + + await this.grainDirectory + .Received(1) + .UnregisterSilos(Arg.Is>(list => list.Count == 1 && list.Contains(outdatedSilo))); + + await this.lifecycle.OnStop(); + } + + [Fact] + public async Task UpdateCacheStampsCurrentMembershipVersion() + { + await this.lifecycle.OnStart(); + + var grainId = GrainId.Create(GrainType.Create("test"), GrainIdKeyExtensions.CreateGuidKey(Guid.NewGuid())); + var silo = GenerateSiloAddress(); + + this.grainLocator.UpdateCache(grainId, silo); + + Assert.True(this.grainLocator.TryLookupInCache(grainId, out var cached)); + Assert.Equal(silo, cached.SiloAddress); + Assert.Equal(this.mockMembershipService.CurrentVersion, cached.MembershipVersion); + + await this.lifecycle.OnStop(); + } + [Fact] public async Task UnregisterCallDirectoryAndCleanCache() { diff --git a/test/Orleans.Runtime.Internal.Tests/ClusterMembershipSnapshotTests.cs b/test/Orleans.Runtime.Internal.Tests/ClusterMembershipSnapshotTests.cs new file mode 100644 index 00000000000..2cd4b65b070 --- /dev/null +++ b/test/Orleans.Runtime.Internal.Tests/ClusterMembershipSnapshotTests.cs @@ -0,0 +1,48 @@ +using System.Collections.Immutable; +using System.Net; +using Orleans.Runtime; +using Xunit; + +namespace UnitTests; + +[TestCategory("BVT"), TestCategory("Membership")] +public class ClusterMembershipSnapshotTests +{ + [Fact] + public void GetSiloStatus_ReturnsDeadForUnknownSiloSeenAtOlderVersion() + { + var unknownSilo = CreateSiloAddress(1); + var knownSilo = CreateSiloAddress(1, port: 11112); + var snapshot = CreateSnapshot(new ClusterMember(knownSilo, SiloStatus.Active, "known"), version: 2); + + Assert.Equal(SiloStatus.Dead, snapshot.GetSiloStatus(unknownSilo, new MembershipVersion(1))); + } + + [Theory] + [InlineData(2)] + [InlineData(3)] + public void GetSiloStatus_ReturnsNoneForUnknownSiloSeenAtCurrentOrNewerVersion(long seenAtVersion) + { + var unknownSilo = CreateSiloAddress(1); + var knownSilo = CreateSiloAddress(1, port: 11112); + var snapshot = CreateSnapshot(new ClusterMember(knownSilo, SiloStatus.Active, "known"), version: 2); + + Assert.Equal(SiloStatus.None, snapshot.GetSiloStatus(unknownSilo, new MembershipVersion(seenAtVersion))); + } + + [Fact] + public void GetSiloStatus_ReturnsDeadForSiloReplacedBySuccessor() + { + var silo = CreateSiloAddress(1); + var successor = CreateSiloAddress(2); + var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2); + + Assert.Equal(SiloStatus.Dead, snapshot.GetSiloStatus(silo, new MembershipVersion(2))); + } + + private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) + => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version)); + + private static SiloAddress CreateSiloAddress(int generation, int port = 11111) + => SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), generation); +} diff --git a/test/Orleans.Runtime.Internal.Tests/GrainDirectoryHandoffManagerTests.cs b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryHandoffManagerTests.cs new file mode 100644 index 00000000000..787179ab70b --- /dev/null +++ b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryHandoffManagerTests.cs @@ -0,0 +1,73 @@ +using System.Collections.Immutable; +using System.Net; +using Orleans.Runtime; +using Orleans.Runtime.GrainDirectory; +using Xunit; + +namespace UnitTests; + +[TestCategory("BVT"), TestCategory("GrainDirectory")] +public class GrainDirectoryHandoffManagerTests +{ + [Theory] + [InlineData(SiloStatus.Active, true)] + [InlineData(SiloStatus.ShuttingDown, true)] + [InlineData(SiloStatus.Stopping, true)] + [InlineData(SiloStatus.Dead, false)] + public void IsTransferableRegistration_UsesSnapshotStatus(SiloStatus status, bool expected) + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(silo, status, "silo"), version: 2); + + Assert.Equal(expected, GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + [Fact] + public void IsTransferableRegistration_AllowsUnknownSiloWithoutNewerMembershipVersion() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.True(GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + [Fact] + public void IsTransferableRegistration_RejectsUnknownSiloWithOlderMembershipVersion() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 1); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.False(GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + [Fact] + public void IsTransferableRegistration_RejectsSiloReplacedBySuccessor() + { + var silo = CreateSiloAddress(1); + var successor = CreateSiloAddress(2); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2); + + Assert.False(GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) + => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version)); + + private static GrainAddress CreateGrainAddress(SiloAddress siloAddress, long membershipVersion) + => new() + { + GrainId = GrainId.Create("test-grain", Guid.NewGuid().ToString("N")), + ActivationId = ActivationId.NewId(), + SiloAddress = siloAddress, + MembershipVersion = new MembershipVersion(membershipVersion) + }; + + private static SiloAddress CreateSiloAddress(int generation, int port = 11111) + => SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), generation); +} diff --git a/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs index 4f7582d2655..e9fe185227c 100644 --- a/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs +++ b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs @@ -31,15 +31,16 @@ namespace UnitTests; public class GrainDirectoryPartitionTests { private readonly LocalGrainDirectoryPartition _target; - private readonly MockSiloStatusOracle _siloStatusOracle; - private static readonly SiloAddress LocalSiloAddress = SiloAddress.FromParsableString("127.0.0.1:11111@123"); - private static readonly SiloAddress OtherSiloAddress = SiloAddress.FromParsableString("127.0.0.2:11111@456"); + private readonly MockClusterMembershipService _clusterMembershipService; + private static readonly SiloAddress LocalSiloAddress = SiloAddress.FromParsableString("127.0.0.1:11111@123"); + private static readonly SiloAddress OtherSiloAddress = SiloAddress.FromParsableString("127.0.0.2:11111@456"); public GrainDirectoryPartitionTests() { - _siloStatusOracle = new MockSiloStatusOracle(); + _clusterMembershipService = new MockClusterMembershipService(); + _clusterMembershipService.SetSiloStatus(LocalSiloAddress, SiloStatus.Active); _target = new LocalGrainDirectoryPartition( - _siloStatusOracle, + _clusterMembershipService, Options.Create(new GrainDirectoryOptions()), new LoggerFactory()); } @@ -53,7 +54,7 @@ public GrainDirectoryPartitionTests() [Fact] public void OverrideDeadEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var firstGrainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -62,7 +63,7 @@ public void OverrideDeadEntryTest() var firstRegister = _target.AddSingleActivation(firstGrainAddress, previousAddress: null); Assert.Equal(firstGrainAddress, firstRegister.Address); - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); // Previous entry is now pointing to a dead silo, it should be possible to override it now var secondGrainAddress = GrainAddress.NewActivationAddress(LocalSiloAddress, grainId); @@ -79,7 +80,7 @@ public void OverrideDeadEntryTest() [Fact] public void DoNotInsertInvalidEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -97,7 +98,7 @@ public void DoNotInsertInvalidEntryTest() [Fact] public void DoNotOverrideValidEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -121,7 +122,7 @@ public void DoNotOverrideValidEntryTest() [Fact] public void OverrideValidEntryIfMatchesTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -145,7 +146,7 @@ public void OverrideValidEntryIfMatchesTest() [Fact] public void DoNotOverrideValidEntryIfNoMatchTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -176,7 +177,7 @@ public void DoNotOverrideValidEntryIfNoMatchTest() [Fact] public void DoNotReturnInvalidEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress1 = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -185,65 +186,54 @@ public void DoNotReturnInvalidEntryTest() var register1 = _target.AddSingleActivation(grainAddress1, previousAddress: null); Assert.Equal(grainAddress1, register1.Address); - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); // Previous entry is no longer still valid, it should not be returned var lookup = _target.LookUpActivation(grainId); Assert.Null(lookup.Address); } - /// - /// Mock implementation of ISiloStatusOracle for testing. - /// The silo status oracle provides membership information about - /// which silos are alive, dead, or in other states. The grain - /// directory uses this to validate entries and make placement decisions. - /// - private class MockSiloStatusOracle : ISiloStatusOracle + [Theory] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public void ReturnEntryForTerminatingButNotDeadSilo(SiloStatus status) { - private readonly Dictionary _content = new(); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, status); - public MockSiloStatusOracle(SiloAddress siloAddress = null) - { - SiloAddress = siloAddress ?? LocalSiloAddress; - _content[SiloAddress] = SiloStatus.Active; - } + var grainId = GrainId.Create("testGrain", "myKey"); + var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); + + var register = _target.AddSingleActivation(grainAddress, previousAddress: null); + Assert.Equal(grainAddress, register.Address); + + var lookup = _target.LookUpActivation(grainId); + Assert.Equal(grainAddress, lookup.Address); + } - public SiloStatus CurrentStatus => SiloStatus.Active; + private sealed class MockClusterMembershipService : IClusterMembershipService + { + private readonly Dictionary _statuses = new(); + private long _version; - public string SiloName => "TestSilo"; + public ClusterMembershipSnapshot CurrentSnapshot { get; private set; } = + new(ImmutableDictionary.Empty, MembershipVersion.MinValue); - public SiloAddress SiloAddress { get; } + public IAsyncEnumerable MembershipUpdates => throw new NotImplementedException(); - public SiloStatus GetApproximateSiloStatus(SiloAddress siloAddress) + public void SetSiloStatus(SiloAddress siloAddress, SiloStatus status) { - if (_content.TryGetValue(siloAddress, out var status)) + _statuses[siloAddress] = (status, "TestSilo"); + var members = ImmutableDictionary.CreateBuilder(); + foreach (var (silo, entry) in _statuses) { - return status; + members[silo] = new ClusterMember(silo, entry.Status, entry.Name); } - return SiloStatus.None; - } - public Dictionary GetApproximateSiloStatuses(bool onlyActive = false) - { - return onlyActive - ? new Dictionary(_content.Where(kvp => kvp.Value == SiloStatus.Active)) - : new Dictionary(_content); + CurrentSnapshot = new ClusterMembershipSnapshot(members.ToImmutable(), new MembershipVersion(++_version)); } - public ImmutableArray GetActiveSilos() => _content.Keys.ToImmutableArray(); - - public void SetSiloStatus(SiloAddress siloAddress, SiloStatus status) => _content[siloAddress] = status; - - public bool IsDeadSilo(SiloAddress silo) => GetApproximateSiloStatus(silo) == SiloStatus.Dead; - - public bool IsFunctionalDirectory(SiloAddress siloAddress) => !GetApproximateSiloStatus(siloAddress).IsTerminating(); - - #region Not Implemented - public bool SubscribeToSiloStatusEvents(ISiloStatusListener observer) => throw new NotImplementedException(); - - public bool TryGetSiloName(SiloAddress siloAddress, out string siloName) => throw new NotImplementedException(); + public ValueTask Refresh(MembershipVersion minimumVersion = default, CancellationToken cancellationToken = default) => default; - public bool UnSubscribeFromSiloStatusEvents(ISiloStatusListener observer) => throw new NotImplementedException(); - #endregion + public Task TryKill(SiloAddress siloAddress) => throw new NotImplementedException(); } } diff --git a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs new file mode 100644 index 00000000000..3b820aec7d7 --- /dev/null +++ b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs @@ -0,0 +1,82 @@ +using System.Collections.Immutable; +using System.Net; +using Orleans.Runtime; +using Orleans.Runtime.GrainDirectory; +using Xunit; + +namespace UnitTests; + +[TestCategory("BVT"), TestCategory("GrainDirectory")] +public class LocalGrainDirectoryTests +{ + [Theory] + [InlineData(SiloStatus.Active)] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public void IsDefunctActivation_DoesNotRemoveNonDeadSilos(SiloStatus status) + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo); + var snapshot = CreateSnapshot(new ClusterMember(silo, status, "silo"), version: 2); + + Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_RemovesDeadSilos() + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(silo, SiloStatus.Dead, "silo"), version: 2); + + Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_DoesNotRemoveUnknownSiloWithoutNewerMembershipVersion() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_RemovesUnknownSiloWithOlderMembershipVersion() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 1); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_RemovesSiloReplacedBySuccessor() + { + var silo = CreateSiloAddress(1); + var successor = CreateSiloAddress(2); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2); + + Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) + => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version)); + + private static GrainAddress CreateGrainAddress(SiloAddress siloAddress, long membershipVersion = 1) + => new() + { + GrainId = GrainId.Create("test-grain", Guid.NewGuid().ToString("N")), + ActivationId = ActivationId.NewId(), + SiloAddress = siloAddress, + MembershipVersion = new MembershipVersion(membershipVersion) + }; + + private static SiloAddress CreateSiloAddress(int generation, int port = 11111) + => SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), generation); +}