diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs index 0eff3079ee0..dfca80b796d 100644 --- a/src/Orleans.Runtime/Catalog/Catalog.cs +++ b/src/Orleans.Runtime/Catalog/Catalog.cs @@ -11,9 +11,7 @@ namespace Orleans.Runtime { internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecycleParticipant { - private readonly SiloAddress _siloAddress; private readonly ActivationCollector activationCollector; - private readonly GrainDirectoryResolver grainDirectoryResolver; private readonly ActivationDirectory activations; private readonly IServiceProvider serviceProvider; private readonly ILogger logger; @@ -30,8 +28,6 @@ internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecyclePartic #endif public Catalog( - ILocalSiloDetails localSiloDetails, - GrainDirectoryResolver grainDirectoryResolver, ActivationDirectory activationDirectory, ActivationCollector activationCollector, IServiceProvider serviceProvider, @@ -40,8 +36,6 @@ public Catalog( SystemTargetShared shared) : base(Constants.CatalogType, shared) { - this._siloAddress = localSiloDetails.SiloAddress; - this.grainDirectoryResolver = grainDirectoryResolver; this.activations = activationDirectory; this.serviceProvider = serviceProvider; this.grainActivator = grainActivator; @@ -328,99 +322,12 @@ await Parallel.ForEachAsync(addresses, (activationAddress, cancellationToken) => }); } - // TODO move this logic in the LocalGrainDirectory - internal void OnSiloStatusChange(ILocalGrainDirectory directory, SiloAddress updatedSilo, SiloStatus status) - { - // ignore joining events and also events on myself. - if (updatedSilo.Equals(_siloAddress)) return; - - // We deactivate those activations when silo goes either of ShuttingDown/Stopping/Dead states, - // since this is what Directory is doing as well. Directory removes a silo based on all those 3 statuses, - // thus it will only deliver a "remove" notification for a given silo once to us. Therefore, we need to react the fist time we are notified. - // We may review the directory behavior in the future and treat ShuttingDown differently ("drain only") and then this code will have to change a well. - if (!status.IsTerminating()) return; - if (status == SiloStatus.Dead) - { - this.RuntimeClient.BreakOutstandingMessagesToSilo(updatedSilo); - } - - var activationsToShutdown = new List(); - try - { - // scan all activations in activation directory and deactivate the ones that the removed silo is their primary partition owner. - // Note: No lock needed here since ActivationDirectory uses ConcurrentDictionary which provides thread-safe enumeration - foreach (var activation in activations) - { - try - { - var activationData = activation.Value; - var placementStrategy = activationData.GetComponent(); - var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true }; - if (!isUsingGrainDirectory || !grainDirectoryResolver.IsUsingDefaultDirectory(activationData.GrainId.Type)) continue; - if (!updatedSilo.Equals(directory.GetPrimaryForGrain(activationData.GrainId))) continue; - - activationsToShutdown.Add(activationData); - } - catch (Exception exc) - { - LogErrorCatalogSiloStatusChangeNotification(new(updatedSilo), exc); - } - } - - if (activationsToShutdown.Count > 0) - { - LogInfoCatalogSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo)); - } - } - finally - { - // outside the lock. - if (activationsToShutdown.Count > 0) - { - var reasonText = $"This activation is being deactivated due to a failure of server {updatedSilo}, since it was responsible for this activation's grain directory registration."; - var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText); - StartDeactivatingActivations(reason, activationsToShutdown, CancellationToken.None); - } - } - - void StartDeactivatingActivations(DeactivationReason reason, List list, CancellationToken cancellationToken) - { - if (list == null || list.Count == 0) return; - - LogDebugDeactivateActivations(list.Count); - - foreach (var activation in list) - { - activation.Deactivate(reason, cancellationToken); - } - } - } - void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) { // Do nothing, just ensure that this instance is created so that it can register itself in the activation directory. _siloStatusOracle = serviceProvider.GetRequiredService(); } - private readonly struct SiloAddressLogValue(SiloAddress silo) - { - public override string ToString() => silo.ToStringWithHashCode(); - } - - [LoggerMessage( - Level = LogLevel.Error, - EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception, - Message = "Catalog has thrown an exception while handling removal of silo {Silo}" - )] - private partial void LogErrorCatalogSiloStatusChangeNotification(SiloAddressLogValue silo, Exception exception); - - [LoggerMessage( - Level = LogLevel.Information, - EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification, - Message = "Catalog is deactivating {Count} activations due to a failure of silo {Silo}, since it is a primary directory partition to these grain ids." - )] - private partial void LogInfoCatalogSiloStatusChangeNotification(int count, SiloAddressLogValue silo); - [LoggerMessage( Level = LogLevel.Trace, Message = "Unregistered activation {Activation}")] diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs index 2f8d58c5e41..bac6eb68558 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs @@ -15,7 +15,6 @@ namespace Orleans.Runtime.GrainDirectory internal sealed partial class GrainDirectoryHandoffManager { private static readonly TimeSpan RetryDelay = TimeSpan.FromMilliseconds(250); - private const int MAX_OPERATION_DEQUEUE = 2; private readonly LocalGrainDirectory localDirectory; private readonly ISiloStatusOracle siloStatusOracle; private readonly IInternalGrainFactory grainFactory; @@ -71,6 +70,11 @@ internal void ProcessSiloAddEvent(SiloAddress addedSilo) private async Task ProcessAddedSiloAsync(SiloAddress addedSilo, List splitPartListSingle) { if (!this.localDirectory.Running) return; + if (!addedSilo.Equals(localDirectory.FindSuccessor(localDirectory.MyAddress))) + { + LogDebugNotImmediateSuccessor(logger, addedSilo); + return; + } if (this.siloStatusOracle.GetApproximateSiloStatus(addedSilo) == SiloStatus.Active) { @@ -124,6 +128,16 @@ private async Task AcceptExistingRegistrationsAsync(List singleAct { if (!this.localDirectory.Running) return; + for (var i = singleActivations.Count - 1; i >= 0; i--) + { + if (singleActivations[i].SiloAddress is not { } siloAddress || this.siloStatusOracle.GetApproximateSiloStatus(siloAddress).IsTerminating()) + { + singleActivations.RemoveAt(i); + } + } + + if (singleActivations.Count == 0) return; + LogDebugAcceptingRegistrations(logger, singleActivations.Count); var tasks = singleActivations.Select(addr => this.localDirectory.RegisterAsync(addr, previousAddress: null, 1)).ToArray(); @@ -201,7 +215,6 @@ private async Task ExecutePendingOperations() { using (await executorLock.LockAsync()) { - var dequeueCount = 0; while (true) { // Get the next operation, or exit if there are none. @@ -213,34 +226,23 @@ private async Task ExecutePendingOperations() op = this.pendingOperations.Peek(); } - dequeueCount++; - try { await op.Action(this, op.State); - // Success, reset the dequeue count - dequeueCount = 0; - } - catch (Exception exception) - { - if (dequeueCount < MAX_OPERATION_DEQUEUE) - { - LogWarningOperationFailedRetry(logger, exception, op.Name); - await Task.Delay(RetryDelay); - } - else + lock (this) { - LogWarningOperationFailedNoRetry(logger, exception, op.Name); + this.pendingOperations.Dequeue(); } } - if (dequeueCount == 0 || dequeueCount >= MAX_OPERATION_DEQUEUE) + catch (Exception exception) { - lock (this) + if (!this.localDirectory.Running) { - // Remove the operation from the queue if it was a success - // or if we tried too many times - this.pendingOperations.Dequeue(); + return; } + + LogWarningOperationFailedRetry(logger, exception, op.Name); + await Task.Delay(RetryDelay); } } } @@ -335,10 +337,5 @@ private readonly struct GrainAddressesLogValue(List grainAddresses )] private static partial void LogWarningOperationFailedRetry(ILogger logger, Exception exception, string operation); - [LoggerMessage( - Level = LogLevel.Warning, - Message = "{Operation} failed, will NOT be retried" - )] - private static partial void LogWarningOperationFailedNoRetry(ILogger logger, Exception exception, string operation); } } diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 7c97e15c82b..154d5dcc36d 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -10,29 +10,32 @@ using Microsoft.Extensions.Options; using Orleans.Configuration; using Orleans.GrainDirectory; +using Orleans.Internal; using Orleans.Runtime.Scheduler; namespace Orleans.Runtime.GrainDirectory { - internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ISiloStatusListener, ILifecycleParticipant + internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ILifecycleParticipant { private readonly ILogger log; private readonly SiloAddress? seed; private readonly ISiloStatusOracle siloStatusOracle; + private readonly IClusterMembershipService clusterMembershipService; private readonly IInternalGrainFactory grainFactory; -#if NET9_0_OR_GREATER - private readonly Lock writeLock = new(); -#else - private readonly object writeLock = new(); -#endif + private readonly ActivationDirectory localActivations; + private readonly InsideRuntimeClient runtimeClient; private readonly IServiceProvider _serviceProvider; + private readonly CancellationTokenSource _membershipUpdatesCancellation = new(); private DirectoryMembership directoryMembership = DirectoryMembership.Default; + private ClusterMembershipSnapshot appliedClusterMembershipSnapshot = ClusterMembershipSnapshot.Default; + private GrainDirectoryResolver? grainDirectoryResolver; + private bool hasAppliedClusterMembershipSnapshot; // Consider: move these constants into an appropriate place internal const int HOP_LIMIT = 6; // forward a remote request no more than 5 times public static readonly TimeSpan RETRY_DELAY = TimeSpan.FromMilliseconds(200); // Pause 200ms between forwards to let the membership directory settle down internal bool Running; - private Catalog? _catalog; + private Task? membershipUpdatesTask; internal SiloAddress MyAddress { get; } @@ -51,6 +54,7 @@ public LocalGrainDirectory( IServiceProvider serviceProvider, ILocalSiloDetails siloDetails, ISiloStatusOracle siloStatusOracle, + IClusterMembershipService clusterMembershipService, IInternalGrainFactory grainFactory, Factory grainDirectoryPartitionFactory, IOptions developmentClusterMembershipOptions, @@ -63,7 +67,10 @@ public LocalGrainDirectory( MyAddress = siloDetails.SiloAddress; this.siloStatusOracle = siloStatusOracle; + this.clusterMembershipService = clusterMembershipService; this.grainFactory = grainFactory; + this.localActivations = systemTargetShared.ActivationDirectory; + this.runtimeClient = systemTargetShared.RuntimeClient; DirectoryCache = GrainDirectoryCacheFactory.CreateGrainDirectoryCache(serviceProvider, grainDirectoryOptions.Value, out this.disposeDirectoryCache); @@ -96,8 +103,7 @@ public LocalGrainDirectory( DistributedGrainDirectoryPartitionCompatibilities = compatibilityPartitions.MoveToImmutable(); } - // add myself to the list of members - AddServer(MyAddress); + this.directoryMembership = DirectoryMembership.Create([MyAddress]); DirectoryInstruments.RegisterDirectoryPartitionSizeObserve(() => DirectoryPartition.Count); DirectoryInstruments.RegisterMyPortionRingDistanceObserve(() => RingDistanceToSuccessor()); @@ -116,8 +122,7 @@ public void Start() LogDebugStart(); Running = true; - - siloStatusOracle.SubscribeToSiloStatusEvents(this); + membershipUpdatesTask = Task.Run(() => ProcessMembershipUpdates(_membershipUpdatesCancellation.Token)); } // Note that this implementation stops processing directory change requests (Register, Unregister, etc.) when the Stop event is raised. @@ -135,139 +140,276 @@ public async Task StopAsync() //mark Running as false will exclude myself from CalculateGrainDirectoryPartition(grainId) Running = false; + _membershipUpdatesCancellation.Cancel(); + if (membershipUpdatesTask is { } task) + { + await task.SuppressThrowing(); + } if (this.disposeDirectoryCache) { - await GrainDirectoryCacheFactory.DisposeGrainDirectoryCacheAsync(DirectoryCache); + await GrainDirectoryCacheFactory.DisposeGrainDirectoryCacheAsync(DirectoryCache).SuppressThrowing(); } DirectoryPartition.Clear(); DirectoryCache.Clear(); } - private void AddServer(SiloAddress silo) + private async Task ProcessMembershipUpdates(CancellationToken cancellationToken) { - lock (this.writeLock) + var snapshot = clusterMembershipService.CurrentSnapshot; + while (!cancellationToken.IsCancellationRequested) { - var existing = this.directoryMembership; - if (existing.MembershipCache.Contains(silo)) + try + { + await ApplyMembershipSnapshot(snapshot); + + await foreach (var update in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken)) + { + snapshot = update; + await ApplyMembershipSnapshot(snapshot); + } + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (Exception exception) { - // we have already cached this silo - return; + LogErrorProcessingMembershipUpdates(exception); + + try + { + await clusterMembershipService.Refresh(cancellationToken: cancellationToken); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + break; + } + catch (Exception refreshException) + { + LogWarningRefreshingClusterMembershipFailed(refreshException); + } + + snapshot = clusterMembershipService.CurrentSnapshot; + await Task.Delay(RETRY_DELAY, cancellationToken).SuppressThrowing(); } + } + } - // insert new silo in the sorted order - long hash = silo.GetConsistentHashCode(); + private Task ApplyMembershipSnapshot(ClusterMembershipSnapshot snapshot) + { + return CacheValidator.RunOrQueueTask(() => + { + ApplyMembershipSnapshotCore(snapshot); + return Task.CompletedTask; + }); + } - // Find the last silo with hash smaller than the new silo, and insert the latter after (this is why we have +1 here) the former. - // Notice that FindLastIndex might return -1 if this should be the first silo in the list, but then - // 'index' will get 0, as needed. - int index = existing.MembershipRingList.FindLastIndex(siloAddr => siloAddr.GetConsistentHashCode() < hash) + 1; + private void ApplyMembershipSnapshotCore(ClusterMembershipSnapshot snapshot) + { + if (!Running) + { + return; + } - this.directoryMembership = new DirectoryMembership( - existing.MembershipRingList.Insert(index, silo), - existing.MembershipCache.Add(silo)); + var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default; + if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version) + { + return; + } - HandoffManager.ProcessSiloAddEvent(silo); + var previousMembership = CreateDirectoryMembership(previousSnapshot); + var targetMembership = CreateDirectoryMembership(snapshot); + this.directoryMembership = targetMembership; - AdjustLocalDirectory(silo, dead: false); - AdjustLocalCache(silo, dead: false); + var removedSilos = GetMembershipDifference(previousMembership, targetMembership); + var addedSilos = GetMembershipDifference(targetMembership, previousMembership); + ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership); + AdjustLocalDirectory(targetMembership); + AdjustLocalCache(targetMembership); + + foreach (var silo in removedSilos) + { + LogDebugSiloRemovedSilo(MyAddress, silo); + } + + foreach (var silo in addedSilos) + { + HandoffManager.ProcessSiloAddEvent(silo); LogDebugSiloAddedSilo(MyAddress, silo); } + + appliedClusterMembershipSnapshot = snapshot; + hasAppliedClusterMembershipSnapshot = true; } - private void RemoveServer(SiloAddress silo, SiloStatus status) + private void ProcessSiloStatusChanges( + ClusterMembershipSnapshot snapshot, + ClusterMembershipSnapshot previousSnapshot, + DirectoryMembership previousMembership) { - lock (this.writeLock) + var changes = previousSnapshot.Version != MembershipVersion.MinValue + ? snapshot.CreateUpdate(previousSnapshot) + : snapshot.AsUpdate(); + var statusChanges = new List(); + foreach (var change in changes.Changes) { - try + if (!change.SiloAddress.Equals(MyAddress) && change.Status.IsTerminating()) { - // Only notify the catalog once. Order is important: call BEFORE updating membershipRingList. - _catalog = _serviceProvider.GetRequiredService(); - _catalog.OnSiloStatusChange(this, silo, status); + statusChanges.Add(change); } - catch (Exception exc) + } + + statusChanges.Sort(static (left, right) => CompareSiloAddress(left.SiloAddress, right.SiloAddress)); + foreach (var change in statusChanges) + { + OnSiloStatusChange(previousMembership, change.SiloAddress, change.Status); + } + } + + private DirectoryMembership CreateDirectoryMembership(ClusterMembershipSnapshot snapshot) + { + var members = new List(); + foreach (var member in snapshot.Members) + { + if (member.Value.Status == SiloStatus.Active) { - LogErrorCatalogSiloStatusChangeNotificationException(exc, new(silo)); + members.Add(member.Key); } + } + + if (Running && !members.Contains(MyAddress)) + { + members.Add(MyAddress); + } + + return DirectoryMembership.Create(members); + } - var existing = this.directoryMembership; - if (!existing.MembershipCache.Contains(silo)) + private List GetMembershipDifference( + DirectoryMembership currentMembership, + DirectoryMembership otherMembership) + { + var result = new List(); + foreach (var silo in currentMembership.MembershipRingList) + { + if (!silo.Equals(MyAddress) && !otherMembership.MembershipCache.Contains(silo)) { - // we have already removed this silo - return; + result.Add(silo); } + } - this.directoryMembership = new DirectoryMembership( - existing.MembershipRingList.Remove(silo), - existing.MembershipCache.Remove(silo)); + return result; + } - AdjustLocalDirectory(silo, dead: true); - AdjustLocalCache(silo, dead: true); + private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddress updatedSilo, SiloStatus status) + { + if (updatedSilo.Equals(MyAddress) || !status.IsTerminating()) + { + return; + } - LogDebugSiloRemovedSilo(MyAddress, silo); + if (status == SiloStatus.Dead) + { + runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo); + } + + var activationsToShutdown = new List(); + var resolver = grainDirectoryResolver ??= _serviceProvider.GetRequiredService(); + foreach (var activation in localActivations) + { + try + { + var activationData = activation.Value; + var placementStrategy = activationData.GetComponent(); + var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true }; + if (!isUsingGrainDirectory || !resolver.IsUsingDefaultDirectory(activationData.GrainId.Type)) + { + continue; + } + + if (!updatedSilo.Equals(CalculateGrainDirectoryPartition(activationData.GrainId, previousMembership))) + { + continue; + } + + activationsToShutdown.Add(activationData); + } + catch (Exception exception) + { + LogErrorSiloStatusChangeNotification(new(updatedSilo), exception); + } + } + + if (activationsToShutdown.Count == 0) + { + return; + } + + LogInfoSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo)); + + var reasonText = $"This activation is being deactivated due to a failure of server {updatedSilo}, since it was responsible for this activation's grain directory registration."; + var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText); + foreach (var activation in activationsToShutdown) + { + try + { + activation.Deactivate(reason, CancellationToken.None); + } + catch (Exception exception) + { + LogErrorDeactivatingActivationForRemovedSilo(exception, activation.GrainId, new(updatedSilo)); + } } } - /// - /// Adjust local directory following the addition/removal of a silo - /// - private void AdjustLocalDirectory(SiloAddress silo, bool dead) + private void AdjustLocalDirectory(DirectoryMembership targetMembership) { - // Determine which activations to remove. var activationsToRemove = new List<(GrainId, ActivationId)>(); foreach (var entry in this.DirectoryPartition.GetItems()) { if (entry.Value.Activation is { } address) { - // Include any activations from dead silos and from predecessors. - if (dead && address.SiloAddress!.Equals(silo) || address.SiloAddress!.IsPredecessorOf(silo)) + if (IsDefunctActivationSilo(targetMembership, address.SiloAddress)) { activationsToRemove.Add((entry.Key, address.ActivationId)); } } } - // Remove all defunct activations. foreach (var activation in activationsToRemove) { DirectoryPartition.RemoveActivation(activation.Item1, activation.Item2); } } - /// Adjust local cache following the removal of a silo by dropping: - /// 1) entries that point to activations located on the removed silo - /// 2) entries for grains that are now owned by this silo (me) - /// 3) entries for grains that were owned by this removed silo - we currently do NOT do that. - /// If we did 3, we need to do that BEFORE we change the membershipRingList (based on old Membership). - /// We don't do that since first cache refresh handles that. - /// Second, since Membership events are not guaranteed to be ordered, we may remove a cache entry that does not really point to a failed silo. - /// To do that properly, we need to store for each cache entry who was the directory owner that registered this activation (the original partition owner). - private void AdjustLocalCache(SiloAddress silo, bool dead) + private void AdjustLocalCache(DirectoryMembership targetMembership) { - // remove all records of activations located on the removed silo foreach (var tuple in DirectoryCache.KeyValues) { var activationAddress = tuple.ActivationAddress; - // 2) remove entries now owned by me (they should be retrieved from my directory partition) - if (MyAddress.Equals(CalculateGrainDirectoryPartition(activationAddress.GrainId))) + // Remove entries now owned by me. They should be retrieved from my directory partition. + if (MyAddress.Equals(CalculateGrainDirectoryPartition(activationAddress.GrainId, targetMembership))) { DirectoryCache.Remove(activationAddress.GrainId); continue; } - // 1) remove entries that point to activations located on the removed silo - // For dead silos, remove any activation registered to that silo or one of its predecessors. - // For new silos, remove any activation registered to one of its predecessors. - if (activationAddress.SiloAddress!.IsPredecessorOf(silo) || dead && activationAddress.SiloAddress.Equals(silo)) + if (IsDefunctActivationSilo(targetMembership, activationAddress.SiloAddress)) { DirectoryCache.Remove(activationAddress.GrainId); } } } + private static bool IsDefunctActivationSilo(DirectoryMembership targetMembership, SiloAddress? silo) + { + return silo is null || !targetMembership.MembershipCache.Contains(silo); + } + internal SiloAddress? FindPredecessor(SiloAddress silo) { var existing = directoryMembership.MembershipRingList; @@ -294,24 +436,6 @@ private void AdjustLocalCache(SiloAddress silo, bool dead) return existing.Count > 1 ? existing[(index + 1) % existing.Count] : null; } - public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus status) - { - // This silo's status has changed - if (!Equals(updatedSilo, MyAddress)) // Status change for some other silo - { - if (status.IsTerminating()) - { - // QueueAction up the "Remove" to run on a system turn - CacheValidator.WorkItemGroup.QueueAction(() => RemoveServer(updatedSilo, status)); - } - else if (status == SiloStatus.Active) // do not do anything with SiloStatus.Starting -- wait until it actually becomes active - { - // QueueAction up the "Remove" to run on a system turn - CacheValidator.WorkItemGroup.QueueAction(() => AddServer(updatedSilo)); - } - } - } - private bool IsValidSilo(SiloAddress? silo) => siloStatusOracle.IsFunctionalDirectory(silo); /// @@ -321,6 +445,9 @@ public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus sta /// /// public SiloAddress? CalculateGrainDirectoryPartition(GrainId grainId) + => CalculateGrainDirectoryPartition(grainId, this.directoryMembership); + + private SiloAddress? CalculateGrainDirectoryPartition(GrainId grainId, DirectoryMembership membership) { // give a special treatment for special grains if (grainId.IsSystemTarget()) @@ -352,7 +479,7 @@ public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus sta // is doing something valuable. bool excludeMySelf = !Running; - var existing = this.directoryMembership; + var existing = membership; if (existing.MembershipRingList.Count == 0) { // If the membership ring is empty, then we're the owner by default unless we're stopping. @@ -812,6 +939,12 @@ private bool IsSiloNextInTheRing(SiloAddress siloAddr, int hash, bool excludeMyS return siloAddr.GetConsistentHashCode() <= hash && (!excludeMySelf || !siloAddr.Equals(MyAddress)); } + private static int CompareSiloAddress(SiloAddress left, SiloAddress right) + { + var hashComparison = left.GetConsistentHashCode().CompareTo(right.GetConsistentHashCode()); + return hashComparison != 0 ? hashComparison : left.CompareTo(right); + } + public bool IsSiloInCluster(SiloAddress silo) { return this.directoryMembership.MembershipCache.Contains(silo); @@ -852,11 +985,37 @@ private readonly struct SiloHashLogValue(SiloAddress? silo) private partial void LogDebugSiloAddedSilo(SiloAddress siloAddress, SiloAddress otherSiloAddress); [LoggerMessage( - EventId = (int)ErrorCode.Directory_SiloStatusChangeNotification_Exception, Level = LogLevel.Error, - Message = "CatalogSiloStatusListener.SiloStatusChangeNotification has thrown an exception when notified about removed silo {Silo}." + EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception, + Message = "LocalGrainDirectory has thrown an exception while handling removal of silo {Silo}." )] - private partial void LogErrorCatalogSiloStatusChangeNotificationException(Exception exception, SiloAddressLogValue silo); + private partial void LogErrorSiloStatusChangeNotification(SiloAddressLogValue silo, Exception exception); + + [LoggerMessage( + Level = LogLevel.Information, + EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification, + Message = "LocalGrainDirectory is deactivating {Count} activations due to a failure of silo {Silo}, since it is a primary directory partition to these grain ids." + )] + private partial void LogInfoSiloStatusChangeNotification(int count, SiloAddressLogValue silo); + + [LoggerMessage( + Level = LogLevel.Error, + EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception, + Message = "LocalGrainDirectory has thrown an exception while deactivating activation {GrainId} due to removal of silo {Silo}." + )] + private partial void LogErrorDeactivatingActivationForRemovedSilo(Exception exception, GrainId grainId, SiloAddressLogValue silo); + + [LoggerMessage( + Level = LogLevel.Error, + Message = "Error processing cluster membership updates." + )] + private partial void LogErrorProcessingMembershipUpdates(Exception exception); + + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Failed to refresh cluster membership after a directory membership update processing error." + )] + private partial void LogWarningRefreshingClusterMembershipFailed(Exception exception); [LoggerMessage( Level = LogLevel.Debug, @@ -984,6 +1143,14 @@ public DirectoryMembership(ImmutableList membershipRingList, Immuta public static DirectoryMembership Default { get; } = new DirectoryMembership(ImmutableList.Empty, ImmutableHashSet.Empty); + public static DirectoryMembership Create(IEnumerable members) + { + var builder = ImmutableList.CreateBuilder(); + builder.AddRange(members); + builder.Sort(LocalGrainDirectory.CompareSiloAddress); + var ring = builder.ToImmutable(); + return new DirectoryMembership(ring, ring.ToImmutableHashSet()); + } public ImmutableList MembershipRingList { get; } public ImmutableHashSet MembershipCache { get; } diff --git a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs index db66ed56d2c..066bbb1aa4d 100644 --- a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs +++ b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs @@ -140,6 +140,7 @@ public async Task LocalGrainDirectoryStopDoesNotDisposeRegisteredCustomCache() localSiloDetails.Name.Returns("TestSilo"); localSiloDetails.ClusterId.Returns("TestCluster"); var siloStatusOracle = Substitute.For(); + var membershipService = new MockClusterMembershipService(); var grainFactory = Substitute.For(); var services = new ServiceCollection() .AddSingleton(cache) @@ -160,6 +161,7 @@ public async Task LocalGrainDirectoryStopDoesNotDisposeRegisteredCustomCache() serviceProvider: services, siloDetails: localSiloDetails, siloStatusOracle: siloStatusOracle, + clusterMembershipService: membershipService.Target, grainFactory: grainFactory, grainDirectoryPartitionFactory: partitionFactory, developmentClusterMembershipOptions: Options.Create(new DevelopmentClusterMembershipOptions()),