diff --git a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs index 8314c99c0ed..32cd76dc558 100644 --- a/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs +++ b/src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs @@ -169,10 +169,10 @@ private async Task ListenToClusterChange() var updates = this.clusterMembershipService.MembershipUpdates.WithCancellation(this.shutdownToken.Token); await foreach (var snapshot in updates) { - // Active filtering: detect silos that went down and try to clean proactively the directory + // Active filtering: detect dead silos and try to clean proactively the directory var changes = snapshot.CreateUpdate(previousSnapshot).Changes; var deadSilos = changes - .Where(member => member.Status.IsTerminating()) + .Where(member => member.Status == SiloStatus.Dead) .Select(member => member.SiloAddress) .ToList(); @@ -187,6 +187,7 @@ private async Task ListenToClusterChange() } ((ITestAccessor)this).LastMembershipVersion = snapshot.Version; + previousSnapshot = snapshot; } } @@ -196,22 +197,16 @@ private bool IsKnownDeadSilo(GrainAddress grainAddress) private bool IsKnownDeadSilo(SiloAddress siloAddress, MembershipVersion membershipVersion) { var current = this.clusterMembershipService.CurrentSnapshot; - - // Check if the target silo is in the cluster - if (current.Members.TryGetValue(siloAddress, out var value)) - { - // It is, check if it's alive - return value.Status.IsTerminating(); - } - - // We didn't find it in the cluster. If the silo entry is too old, it has been cleaned in the membership table: the entry isn't valid anymore. - // Otherwise, maybe the membership service isn't up to date yet. The entry should be valid - return current.Version > membershipVersion; + return siloAddress is null || current.GetSiloStatus(siloAddress, membershipVersion) == SiloStatus.Dead; } private static void ThrowUnsupportedGrainType(GrainId grainId) => throw new InvalidOperationException($"Unsupported grain type for grain {grainId}"); - public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0); + public void UpdateCache(GrainId grainId, SiloAddress siloAddress) + { + var membershipVersion = this.clusterMembershipService.CurrentSnapshot.Version; + cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress, MembershipVersion = membershipVersion }, (int)membershipVersion.Value); + } public void InvalidateCache(GrainId grainId) => cache.Remove(grainId); public void InvalidateCache(GrainAddress address) => cache.Remove(address); public bool TryLookupInCache(GrainId grainId, out GrainAddress address) @@ -222,10 +217,10 @@ public bool TryLookupInCache(GrainId grainId, out GrainAddress address) ThrowUnsupportedGrainType(grainId); } - if (this.cache.LookUp(grainId, out address, out var version)) + if (this.cache.LookUp(grainId, out address, out _)) { // If the silo is dead, remove the entry - if (IsKnownDeadSilo(address.SiloAddress, new MembershipVersion(version))) + if (IsKnownDeadSilo(address)) { address = default; this.cache.Remove(grainId); diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs index c5ba7fd21ac..6b36702dd37 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs @@ -17,24 +17,24 @@ internal sealed partial class GrainDirectoryHandoffManager private static readonly TimeSpan RetryDelay = TimeSpan.FromMilliseconds(250); private readonly LocalGrainDirectory localDirectory; private readonly ISiloStatusOracle siloStatusOracle; + private readonly IClusterMembershipService clusterMembershipService; private readonly IInternalGrainFactory grainFactory; private readonly ILogger logger; - private readonly Factory createPartion; private readonly Queue<(string name, object state, Func action)> pendingOperations = new(); private readonly AsyncLock executorLock = new AsyncLock(); internal GrainDirectoryHandoffManager( LocalGrainDirectory localDirectory, ISiloStatusOracle siloStatusOracle, + IClusterMembershipService clusterMembershipService, IInternalGrainFactory grainFactory, - Factory createPartion, ILoggerFactory loggerFactory) { logger = loggerFactory.CreateLogger(); this.localDirectory = localDirectory; this.siloStatusOracle = siloStatusOracle; + this.clusterMembershipService = clusterMembershipService; this.grainFactory = grainFactory; - this.createPartion = createPartion; } internal void ProcessSiloAddEvent(SiloAddress addedSilo) @@ -128,9 +128,10 @@ private async Task AcceptExistingRegistrationsAsync(List singleAct { if (!this.localDirectory.Running) return; + var snapshot = this.clusterMembershipService.CurrentSnapshot; for (var i = singleActivations.Count - 1; i >= 0; i--) { - if (singleActivations[i].SiloAddress is not { } siloAddress || this.siloStatusOracle.GetApproximateSiloStatus(siloAddress) == SiloStatus.Dead) + if (!IsTransferableRegistration(singleActivations[i], snapshot)) { singleActivations.RemoveAt(i); } @@ -199,6 +200,16 @@ private async Task DestroyDuplicateActivationsAsync(Dictionary action) { lock (this) diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs index 4e008eadc98..7b6db82ce39 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.Interface.cs @@ -111,7 +111,8 @@ private GrainAddress RegisterCore(GrainAddress newAddress, GrainAddress? existin return existing; } - private bool IsSiloDead(GrainAddress existing) => _owner.ClusterMembershipSnapshot.GetSiloStatus(existing.SiloAddress) == SiloStatus.Dead; + private bool IsSiloDead(GrainAddress existing) + => existing.SiloAddress is null || _owner.ClusterMembershipSnapshot.GetSiloStatus(existing.SiloAddress, existing.MembershipVersion) == SiloStatus.Dead; [LoggerMessage( Level = LogLevel.Trace, diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index fa261ae209c..38ae57218a1 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -24,7 +24,6 @@ internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ILifec private readonly IClusterMembershipService clusterMembershipService; private readonly IInternalGrainFactory grainFactory; private readonly ActivationDirectory localActivations; - private readonly InsideRuntimeClient runtimeClient; private readonly IServiceProvider _serviceProvider; private readonly CancellationTokenSource _membershipUpdatesCancellation = new(); private DirectoryMembership directoryMembership = DirectoryMembership.Default; @@ -71,7 +70,6 @@ public LocalGrainDirectory( this.clusterMembershipService = clusterMembershipService; this.grainFactory = grainFactory; this.localActivations = systemTargetShared.ActivationDirectory; - this.runtimeClient = systemTargetShared.RuntimeClient; DirectoryCache = GrainDirectoryCacheFactory.CreateGrainDirectoryCache(serviceProvider, grainDirectoryOptions.Value, out this.disposeDirectoryCache); @@ -82,7 +80,7 @@ public LocalGrainDirectory( } DirectoryPartition = grainDirectoryPartitionFactory(); - HandoffManager = new GrainDirectoryHandoffManager(this, siloStatusOracle, grainFactory, grainDirectoryPartitionFactory, loggerFactory); + HandoffManager = new GrainDirectoryHandoffManager(this, siloStatusOracle, clusterMembershipService, grainFactory, loggerFactory); // When DistributedGrainDirectory is active, it registers its own IRemoteGrainDirectory system targets. // In that case, create the RemoteGrainDirectory objects (still needed for WorkItemGroup scheduling) @@ -319,17 +317,6 @@ private List GetMembershipDifference( return result; } - private Task RefreshMembershipIfNewer(GrainAddress address, GrainAddress? previousAddress = null) - { - var targetVersion = address.MembershipVersion; - if (previousAddress is not null && previousAddress.MembershipVersion > targetVersion) - { - targetVersion = previousAddress.MembershipVersion; - } - - return RefreshMembershipIfNewer(targetVersion); - } - private Task RefreshMembershipIfNewer(List addresses) { var targetVersion = MembershipVersion.MinValue; @@ -366,11 +353,6 @@ private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddr return; } - if (status == SiloStatus.Dead) - { - runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo); - } - var activationsToShutdown = new List(); var resolver = grainDirectoryResolver ??= _serviceProvider.GetRequiredService(); foreach (var activation in localActivations) @@ -460,24 +442,14 @@ private void AdjustLocalCache(ClusterMembershipSnapshot snapshot, DirectoryMembe } } - private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot) + internal static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot) { if (address.SiloAddress is not { } silo) { return true; } - if (snapshot.Members.TryGetValue(silo, out var member)) - { - // If this is a known host, remove the activation if the host is dead. - return member.Status == SiloStatus.Dead; - } - - // If this is not a known host, remove the activation if it was registered at an older membership version. - // This indicates that the host must have been removed. - // Hosts cannot activate grains before they are active, and we ensure that we refresh the membership before processing messages, - // so this is a reliable indicator of a defunct activation. - return address.MembershipVersion < snapshot.Version; + return snapshot.GetSiloStatus(silo, address.MembershipVersion) == SiloStatus.Dead; } internal SiloAddress? FindPredecessor(SiloAddress silo) @@ -624,13 +596,13 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres DirectoryInstruments.RegistrationsSingleActIssued.Add(1); } - await RefreshMembershipIfNewer(address, previousAddress); + await RefreshMembershipIfNewer(address.MembershipVersion); // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // After the first forward, we insert a retry delay and recheck owner before forwarding again + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync"); @@ -667,7 +639,7 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres // this way next local lookup will find this ActivationAddress in the cache and we will save a full lookup! if (result.Address == null) return result; - if (!address.Equals(result.Address) || !IsValidSilo(address.SiloAddress)) return result; + if (!address.Equals(result.Address) || IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot)) return result; // update the cache so next local lookup will find this ActivationAddress in the cache and we will save full lookup. DirectoryCache.AddOrUpdate(result.Address, result.VersionTag); @@ -680,7 +652,7 @@ public async Task UnregisterAfterNonexistingActivation(GrainAddress addr, SiloAd { LogTraceUnregisterAfterNonexistingActivation(addr, origin); - await RefreshMembershipIfNewer(addr); + await RefreshMembershipIfNewer(addr.MembershipVersion); if (origin == null || this.directoryMembership.MembershipCache.Contains(origin)) { @@ -709,13 +681,13 @@ public async Task UnregisterAsync(GrainAddress address, UnregistrationCause caus if (hopCount == 0) InvalidateCacheEntry(address); - await RefreshMembershipIfNewer(address); + await RefreshMembershipIfNewer(address.MembershipVersion); // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // After the first forward, we insert a retry delay and recheck owner before forwarding again + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync"); @@ -872,7 +844,15 @@ public bool LocalLookup(GrainId grain, out AddressAndTag result) public AddressAndTag GetLocalDirectoryData(GrainId grain) => DirectoryPartition.LookUpActivation(grain); - public GrainAddress? GetLocalCacheData(GrainId grain) => DirectoryCache.LookUp(grain, out var cache) && IsValidSilo(cache.SiloAddress) ? cache : null; + public GrainAddress? GetLocalCacheData(GrainId grain) + { + if (!DirectoryCache.LookUp(grain, out var cache)) + { + return null; + } + + return IsDefunctActivation(cache, clusterMembershipService.CurrentSnapshot) ? null : cache; + } public async Task LookupAsync(GrainId grainId, int hopCount = 0) { @@ -933,7 +913,7 @@ public async Task LookupAsync(GrainId grainId, int hopCount = 0) var result = await GetDirectoryReference(forwardAddress).LookupAsync(grainId, hopCount + 1); // update the cache - if (result.Address is { } address && IsValidSilo(address.SiloAddress)) + if (result.Address is { } address && !IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot)) { DirectoryCache.AddOrUpdate(address, result.VersionTag); } diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs index f27d5e15c86..90c6d752f10 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs @@ -112,21 +112,22 @@ internal sealed partial class LocalGrainDirectoryPartition private Dictionary partitionData; private readonly object lockable; private readonly ILogger log; - private readonly ISiloStatusOracle siloStatusOracle; + private readonly IClusterMembershipService clusterMembershipService; private readonly IOptions grainDirectoryOptions; internal int Count { get { return partitionData.Count; } } - public LocalGrainDirectoryPartition(ISiloStatusOracle siloStatusOracle, IOptions grainDirectoryOptions, ILoggerFactory loggerFactory) + public LocalGrainDirectoryPartition(IClusterMembershipService clusterMembershipService, IOptions grainDirectoryOptions, ILoggerFactory loggerFactory) { partitionData = new Dictionary(); lockable = new object(); log = loggerFactory.CreateLogger(); - this.siloStatusOracle = siloStatusOracle; + this.clusterMembershipService = clusterMembershipService; this.grainDirectoryOptions = grainDirectoryOptions; } - private bool IsValidSilo(SiloAddress? silo) => silo is not null && siloStatusOracle.IsFunctionalDirectory(silo); + private bool IsDefunctActivation(GrainAddress? address) + => address is null || LocalGrainDirectory.IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot); internal void Clear() { @@ -156,9 +157,11 @@ internal AddressAndTag AddSingleActivation(GrainAddress address, GrainAddress? p { LogTraceAddingSingleActivation(address.SiloAddress, address.GrainId, address.ActivationId); - if (!IsValidSilo(address.SiloAddress)) + if (IsDefunctActivation(address)) { - var siloStatus = this.siloStatusOracle.GetApproximateSiloStatus(address.SiloAddress); + var siloStatus = address.SiloAddress is { } siloAddress + ? this.clusterMembershipService.CurrentSnapshot.GetSiloStatus(siloAddress, address.MembershipVersion) + : SiloStatus.None; throw new OrleansException($"Trying to register {address.GrainId} on invalid silo: {address.SiloAddress}. Known status: {siloStatus}"); } @@ -170,10 +173,8 @@ internal AddressAndTag AddSingleActivation(GrainAddress address, GrainAddress? p } else { - var siloAddress = grainInfo.Activation?.SiloAddress; - // If there is an existing entry pointing to an invalid silo then remove it - if (siloAddress != null && !IsValidSilo(siloAddress)) + if (IsDefunctActivation(grainInfo.Activation)) { partitionData[address.GrainId] = grainInfo = new GrainInfo(); } @@ -230,7 +231,7 @@ internal AddressAndTag LookUpActivation(GrainId grain) result = new(grainInfo.Activation, grainInfo.VersionTag); } - if (!IsValidSilo(result.Address?.SiloAddress)) + if (IsDefunctActivation(result.Address)) { result = new(null, result.VersionTag); } @@ -308,7 +309,7 @@ internal List Split(Predicate predicate) for (var i = result.Count - 1; i >= 0; i--) { - if (!IsValidSilo(result[i].SiloAddress)) + if (IsDefunctActivation(result[i])) { result.RemoveAt(i); } diff --git a/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs b/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs index e69dfbd7145..1e12c444632 100644 --- a/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs +++ b/src/Orleans.Runtime/MembershipService/ClusterMembershipSnapshot.cs @@ -61,6 +61,18 @@ public SiloStatus GetSiloStatus(SiloAddress silo) return status; } + /// + /// Gets status of the specified silo, treating unknown silos as dead if this snapshot is newer than when the silo was seen. + /// + /// The silo. + /// The membership version when the silo was last seen. + /// The status of the specified silo. + public SiloStatus GetSiloStatus(SiloAddress silo, MembershipVersion seenAtVersion) + { + var status = GetSiloStatus(silo); + return status == SiloStatus.None && this.Version > seenAtVersion ? SiloStatus.Dead : status; + } + /// /// Returns a which represents this instance. /// diff --git a/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs b/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs index 2af9f4c3006..bc455ee8ab7 100644 --- a/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs +++ b/src/Orleans.Runtime/MembershipService/SiloStatusListenerManager.cs @@ -11,7 +11,7 @@ namespace Orleans.Runtime.MembershipService; /// /// Manages instances. /// -internal partial class SiloStatusListenerManager : ILifecycleParticipant +internal sealed partial class SiloStatusListenerManager : ILifecycleParticipant { #if NET9_0_OR_GREATER private readonly Lock _listenersLock = new(); diff --git a/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs b/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs index c276aaf0adf..4eb336b1415 100644 --- a/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs +++ b/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs @@ -9,15 +9,18 @@ internal partial class SiloConnectionMaintainer : ILifecycleParticipant log; public SiloConnectionMaintainer( ConnectionManager connectionManager, ISiloStatusOracle siloStatusOracle, + IRuntimeClient runtimeClient, ILogger log) { this.connectionManager = connectionManager; this.siloStatusOracle = siloStatusOracle; + this.runtimeClient = runtimeClient; this.log = log; } @@ -42,6 +45,7 @@ public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus sta { if (status == SiloStatus.Dead && updatedSilo != siloStatusOracle.SiloAddress) { + this.runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo); _ = Task.Run(() => this.CloseConnectionAsync(updatedSilo)); } } @@ -68,4 +72,4 @@ private async Task CloseConnectionAsync(SiloAddress silo) )] private static partial void LogExceptionWhileClosingConnections(ILogger logger, SiloAddress siloAddress, Exception exception); } -} \ No newline at end of file +} diff --git a/src/api/Orleans.Runtime/Orleans.Runtime.cs b/src/api/Orleans.Runtime/Orleans.Runtime.cs index c3577927a1d..19168727c55 100644 --- a/src/api/Orleans.Runtime/Orleans.Runtime.cs +++ b/src/api/Orleans.Runtime/Orleans.Runtime.cs @@ -547,6 +547,8 @@ public ClusterMembershipSnapshot(System.Collections.Immutable.ImmutableDictionar public SiloStatus GetSiloStatus(SiloAddress silo) { throw null; } + public SiloStatus GetSiloStatus(SiloAddress silo, MembershipVersion seenAtVersion) { throw null; } + public override string ToString() { throw null; } } diff --git a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs index e2778888f4d..6677f33ee60 100644 --- a/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs +++ b/test/Orleans.Core.Tests/Directory/CachedGrainLocatorTests.cs @@ -146,7 +146,7 @@ public async Task LocalGrainDirectoryStopDoesNotDisposeRegisteredCustomCache() .AddSingleton(cache) .BuildServiceProvider(); Factory partitionFactory = () => new LocalGrainDirectoryPartition( - siloStatusOracle, + membershipService.Target, Options.Create(new GrainDirectoryOptions()), this.loggerFactory); var systemTargetShared = new SystemTargetShared( @@ -197,7 +197,7 @@ public async Task LocalGrainDirectoryAppliesNewerMembershipBeforeRegisterForward grainFactory.GetSystemTarget(Constants.DirectoryServiceType, remoteSilo).Returns(remoteDirectory); var services = new ServiceCollection().BuildServiceProvider(); Factory partitionFactory = () => new LocalGrainDirectoryPartition( - siloStatusOracle, + membershipService.Target, Options.Create(new GrainDirectoryOptions()), this.loggerFactory); var systemTargetShared = new SystemTargetShared( @@ -454,6 +454,35 @@ public async Task LocalLookupWhenEntryExistsButSiloIsDead() await this.lifecycle.OnStop(); } + [Theory] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public async Task LocalLookupWhenCachedEntrySiloIsTerminatingButNotDead(SiloStatus status) + { + var silo = GenerateSiloAddress(); + + // Setup membership service + this.mockMembershipService.UpdateSiloStatus(silo, SiloStatus.Active, "silo"); + await this.lifecycle.OnStart(); + await WaitUntilClusterChangePropagated(); + + var address = GenerateGrainAddress(silo); + this.grainDirectory.Register(address, previousAddress: null).Returns(address); + + await this.grainLocator.Register(address, previousAddress: null); + Assert.True(this.grainLocator.TryLookupInCache(address.GrainId, out var cached)); + Assert.Equal(address, cached); + + this.mockMembershipService.UpdateSiloStatus(silo, status, "silo"); + await WaitUntilClusterChangePropagated(); + + Assert.True(this.grainLocator.TryLookupInCache(address.GrainId, out cached)); + Assert.Equal(address, cached); + await this.grainDirectory.DidNotReceive().UnregisterSilos(Arg.Any>()); + + await this.lifecycle.OnStop(); + } + /// /// Tests that the locator properly cleans up cached entries when a silo dies. /// This is critical for preventing requests from being sent to dead silos. @@ -502,6 +531,51 @@ await this.grainDirectory await this.lifecycle.OnStop(); } + [Fact] + public async Task CleanupWhenSiloIsDeadOnlyProcessesIncrementalChanges() + { + var expectedSilo = GenerateSiloAddress(); + var outdatedSilo = GenerateSiloAddress(); + + this.mockMembershipService.UpdateSiloStatus(expectedSilo, SiloStatus.Active, "exp"); + this.mockMembershipService.UpdateSiloStatus(outdatedSilo, SiloStatus.Active, "old"); + await this.lifecycle.OnStart(); + await WaitUntilClusterChangePropagated(); + + this.mockMembershipService.UpdateSiloStatus(outdatedSilo, SiloStatus.Dead, "old"); + await WaitUntilClusterChangePropagated(); + + await this.grainDirectory + .Received(1) + .UnregisterSilos(Arg.Is>(list => list.Count == 1 && list.Contains(outdatedSilo))); + + this.mockMembershipService.UpdateSiloStatus(expectedSilo, SiloStatus.Active, "exp2"); + await WaitUntilClusterChangePropagated(); + + await this.grainDirectory + .Received(1) + .UnregisterSilos(Arg.Is>(list => list.Count == 1 && list.Contains(outdatedSilo))); + + await this.lifecycle.OnStop(); + } + + [Fact] + public async Task UpdateCacheStampsCurrentMembershipVersion() + { + await this.lifecycle.OnStart(); + + var grainId = GrainId.Create(GrainType.Create("test"), GrainIdKeyExtensions.CreateGuidKey(Guid.NewGuid())); + var silo = GenerateSiloAddress(); + + this.grainLocator.UpdateCache(grainId, silo); + + Assert.True(this.grainLocator.TryLookupInCache(grainId, out var cached)); + Assert.Equal(silo, cached.SiloAddress); + Assert.Equal(this.mockMembershipService.CurrentVersion, cached.MembershipVersion); + + await this.lifecycle.OnStop(); + } + [Fact] public async Task UnregisterCallDirectoryAndCleanCache() { diff --git a/test/Orleans.Runtime.Internal.Tests/ClusterMembershipSnapshotTests.cs b/test/Orleans.Runtime.Internal.Tests/ClusterMembershipSnapshotTests.cs new file mode 100644 index 00000000000..2cd4b65b070 --- /dev/null +++ b/test/Orleans.Runtime.Internal.Tests/ClusterMembershipSnapshotTests.cs @@ -0,0 +1,48 @@ +using System.Collections.Immutable; +using System.Net; +using Orleans.Runtime; +using Xunit; + +namespace UnitTests; + +[TestCategory("BVT"), TestCategory("Membership")] +public class ClusterMembershipSnapshotTests +{ + [Fact] + public void GetSiloStatus_ReturnsDeadForUnknownSiloSeenAtOlderVersion() + { + var unknownSilo = CreateSiloAddress(1); + var knownSilo = CreateSiloAddress(1, port: 11112); + var snapshot = CreateSnapshot(new ClusterMember(knownSilo, SiloStatus.Active, "known"), version: 2); + + Assert.Equal(SiloStatus.Dead, snapshot.GetSiloStatus(unknownSilo, new MembershipVersion(1))); + } + + [Theory] + [InlineData(2)] + [InlineData(3)] + public void GetSiloStatus_ReturnsNoneForUnknownSiloSeenAtCurrentOrNewerVersion(long seenAtVersion) + { + var unknownSilo = CreateSiloAddress(1); + var knownSilo = CreateSiloAddress(1, port: 11112); + var snapshot = CreateSnapshot(new ClusterMember(knownSilo, SiloStatus.Active, "known"), version: 2); + + Assert.Equal(SiloStatus.None, snapshot.GetSiloStatus(unknownSilo, new MembershipVersion(seenAtVersion))); + } + + [Fact] + public void GetSiloStatus_ReturnsDeadForSiloReplacedBySuccessor() + { + var silo = CreateSiloAddress(1); + var successor = CreateSiloAddress(2); + var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2); + + Assert.Equal(SiloStatus.Dead, snapshot.GetSiloStatus(silo, new MembershipVersion(2))); + } + + private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) + => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version)); + + private static SiloAddress CreateSiloAddress(int generation, int port = 11111) + => SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), generation); +} diff --git a/test/Orleans.Runtime.Internal.Tests/GrainDirectoryHandoffManagerTests.cs b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryHandoffManagerTests.cs new file mode 100644 index 00000000000..787179ab70b --- /dev/null +++ b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryHandoffManagerTests.cs @@ -0,0 +1,73 @@ +using System.Collections.Immutable; +using System.Net; +using Orleans.Runtime; +using Orleans.Runtime.GrainDirectory; +using Xunit; + +namespace UnitTests; + +[TestCategory("BVT"), TestCategory("GrainDirectory")] +public class GrainDirectoryHandoffManagerTests +{ + [Theory] + [InlineData(SiloStatus.Active, true)] + [InlineData(SiloStatus.ShuttingDown, true)] + [InlineData(SiloStatus.Stopping, true)] + [InlineData(SiloStatus.Dead, false)] + public void IsTransferableRegistration_UsesSnapshotStatus(SiloStatus status, bool expected) + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(silo, status, "silo"), version: 2); + + Assert.Equal(expected, GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + [Fact] + public void IsTransferableRegistration_AllowsUnknownSiloWithoutNewerMembershipVersion() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.True(GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + [Fact] + public void IsTransferableRegistration_RejectsUnknownSiloWithOlderMembershipVersion() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 1); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.False(GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + [Fact] + public void IsTransferableRegistration_RejectsSiloReplacedBySuccessor() + { + var silo = CreateSiloAddress(1); + var successor = CreateSiloAddress(2); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2); + + Assert.False(GrainDirectoryHandoffManager.IsTransferableRegistration(address, snapshot)); + } + + private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) + => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version)); + + private static GrainAddress CreateGrainAddress(SiloAddress siloAddress, long membershipVersion) + => new() + { + GrainId = GrainId.Create("test-grain", Guid.NewGuid().ToString("N")), + ActivationId = ActivationId.NewId(), + SiloAddress = siloAddress, + MembershipVersion = new MembershipVersion(membershipVersion) + }; + + private static SiloAddress CreateSiloAddress(int generation, int port = 11111) + => SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), generation); +} diff --git a/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs index 4f7582d2655..e9fe185227c 100644 --- a/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs +++ b/test/Orleans.Runtime.Internal.Tests/GrainDirectoryPartitionTests.cs @@ -31,15 +31,16 @@ namespace UnitTests; public class GrainDirectoryPartitionTests { private readonly LocalGrainDirectoryPartition _target; - private readonly MockSiloStatusOracle _siloStatusOracle; - private static readonly SiloAddress LocalSiloAddress = SiloAddress.FromParsableString("127.0.0.1:11111@123"); - private static readonly SiloAddress OtherSiloAddress = SiloAddress.FromParsableString("127.0.0.2:11111@456"); + private readonly MockClusterMembershipService _clusterMembershipService; + private static readonly SiloAddress LocalSiloAddress = SiloAddress.FromParsableString("127.0.0.1:11111@123"); + private static readonly SiloAddress OtherSiloAddress = SiloAddress.FromParsableString("127.0.0.2:11111@456"); public GrainDirectoryPartitionTests() { - _siloStatusOracle = new MockSiloStatusOracle(); + _clusterMembershipService = new MockClusterMembershipService(); + _clusterMembershipService.SetSiloStatus(LocalSiloAddress, SiloStatus.Active); _target = new LocalGrainDirectoryPartition( - _siloStatusOracle, + _clusterMembershipService, Options.Create(new GrainDirectoryOptions()), new LoggerFactory()); } @@ -53,7 +54,7 @@ public GrainDirectoryPartitionTests() [Fact] public void OverrideDeadEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var firstGrainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -62,7 +63,7 @@ public void OverrideDeadEntryTest() var firstRegister = _target.AddSingleActivation(firstGrainAddress, previousAddress: null); Assert.Equal(firstGrainAddress, firstRegister.Address); - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); // Previous entry is now pointing to a dead silo, it should be possible to override it now var secondGrainAddress = GrainAddress.NewActivationAddress(LocalSiloAddress, grainId); @@ -79,7 +80,7 @@ public void OverrideDeadEntryTest() [Fact] public void DoNotInsertInvalidEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -97,7 +98,7 @@ public void DoNotInsertInvalidEntryTest() [Fact] public void DoNotOverrideValidEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -121,7 +122,7 @@ public void DoNotOverrideValidEntryTest() [Fact] public void OverrideValidEntryIfMatchesTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -145,7 +146,7 @@ public void OverrideValidEntryIfMatchesTest() [Fact] public void DoNotOverrideValidEntryIfNoMatchTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -176,7 +177,7 @@ public void DoNotOverrideValidEntryIfNoMatchTest() [Fact] public void DoNotReturnInvalidEntryTest() { - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Active); var grainId = GrainId.Create("testGrain", "myKey"); var grainAddress1 = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); @@ -185,65 +186,54 @@ public void DoNotReturnInvalidEntryTest() var register1 = _target.AddSingleActivation(grainAddress1, previousAddress: null); Assert.Equal(grainAddress1, register1.Address); - _siloStatusOracle.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, SiloStatus.Dead); // Previous entry is no longer still valid, it should not be returned var lookup = _target.LookUpActivation(grainId); Assert.Null(lookup.Address); } - /// - /// Mock implementation of ISiloStatusOracle for testing. - /// The silo status oracle provides membership information about - /// which silos are alive, dead, or in other states. The grain - /// directory uses this to validate entries and make placement decisions. - /// - private class MockSiloStatusOracle : ISiloStatusOracle + [Theory] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public void ReturnEntryForTerminatingButNotDeadSilo(SiloStatus status) { - private readonly Dictionary _content = new(); + _clusterMembershipService.SetSiloStatus(OtherSiloAddress, status); - public MockSiloStatusOracle(SiloAddress siloAddress = null) - { - SiloAddress = siloAddress ?? LocalSiloAddress; - _content[SiloAddress] = SiloStatus.Active; - } + var grainId = GrainId.Create("testGrain", "myKey"); + var grainAddress = GrainAddress.NewActivationAddress(OtherSiloAddress, grainId); + + var register = _target.AddSingleActivation(grainAddress, previousAddress: null); + Assert.Equal(grainAddress, register.Address); + + var lookup = _target.LookUpActivation(grainId); + Assert.Equal(grainAddress, lookup.Address); + } - public SiloStatus CurrentStatus => SiloStatus.Active; + private sealed class MockClusterMembershipService : IClusterMembershipService + { + private readonly Dictionary _statuses = new(); + private long _version; - public string SiloName => "TestSilo"; + public ClusterMembershipSnapshot CurrentSnapshot { get; private set; } = + new(ImmutableDictionary.Empty, MembershipVersion.MinValue); - public SiloAddress SiloAddress { get; } + public IAsyncEnumerable MembershipUpdates => throw new NotImplementedException(); - public SiloStatus GetApproximateSiloStatus(SiloAddress siloAddress) + public void SetSiloStatus(SiloAddress siloAddress, SiloStatus status) { - if (_content.TryGetValue(siloAddress, out var status)) + _statuses[siloAddress] = (status, "TestSilo"); + var members = ImmutableDictionary.CreateBuilder(); + foreach (var (silo, entry) in _statuses) { - return status; + members[silo] = new ClusterMember(silo, entry.Status, entry.Name); } - return SiloStatus.None; - } - public Dictionary GetApproximateSiloStatuses(bool onlyActive = false) - { - return onlyActive - ? new Dictionary(_content.Where(kvp => kvp.Value == SiloStatus.Active)) - : new Dictionary(_content); + CurrentSnapshot = new ClusterMembershipSnapshot(members.ToImmutable(), new MembershipVersion(++_version)); } - public ImmutableArray GetActiveSilos() => _content.Keys.ToImmutableArray(); - - public void SetSiloStatus(SiloAddress siloAddress, SiloStatus status) => _content[siloAddress] = status; - - public bool IsDeadSilo(SiloAddress silo) => GetApproximateSiloStatus(silo) == SiloStatus.Dead; - - public bool IsFunctionalDirectory(SiloAddress siloAddress) => !GetApproximateSiloStatus(siloAddress).IsTerminating(); - - #region Not Implemented - public bool SubscribeToSiloStatusEvents(ISiloStatusListener observer) => throw new NotImplementedException(); - - public bool TryGetSiloName(SiloAddress siloAddress, out string siloName) => throw new NotImplementedException(); + public ValueTask Refresh(MembershipVersion minimumVersion = default, CancellationToken cancellationToken = default) => default; - public bool UnSubscribeFromSiloStatusEvents(ISiloStatusListener observer) => throw new NotImplementedException(); - #endregion + public Task TryKill(SiloAddress siloAddress) => throw new NotImplementedException(); } } diff --git a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs new file mode 100644 index 00000000000..3b820aec7d7 --- /dev/null +++ b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs @@ -0,0 +1,82 @@ +using System.Collections.Immutable; +using System.Net; +using Orleans.Runtime; +using Orleans.Runtime.GrainDirectory; +using Xunit; + +namespace UnitTests; + +[TestCategory("BVT"), TestCategory("GrainDirectory")] +public class LocalGrainDirectoryTests +{ + [Theory] + [InlineData(SiloStatus.Active)] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public void IsDefunctActivation_DoesNotRemoveNonDeadSilos(SiloStatus status) + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo); + var snapshot = CreateSnapshot(new ClusterMember(silo, status, "silo"), version: 2); + + Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_RemovesDeadSilos() + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(silo, SiloStatus.Dead, "silo"), version: 2); + + Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_DoesNotRemoveUnknownSiloWithoutNewerMembershipVersion() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_RemovesUnknownSiloWithOlderMembershipVersion() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 1); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_RemovesSiloReplacedBySuccessor() + { + var silo = CreateSiloAddress(1); + var successor = CreateSiloAddress(2); + var address = CreateGrainAddress(silo, membershipVersion: 2); + var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2); + + Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) + => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version)); + + private static GrainAddress CreateGrainAddress(SiloAddress siloAddress, long membershipVersion = 1) + => new() + { + GrainId = GrainId.Create("test-grain", Guid.NewGuid().ToString("N")), + ActivationId = ActivationId.NewId(), + SiloAddress = siloAddress, + MembershipVersion = new MembershipVersion(membershipVersion) + }; + + private static SiloAddress CreateSiloAddress(int generation, int port = 11111) + => SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), generation); +} diff --git a/test/TestInfrastructure/TestExtensions/XunitLoggerProvider.cs b/test/TestInfrastructure/TestExtensions/XunitLoggerProvider.cs index 17a0539b9b4..fb69e243652 100644 --- a/test/TestInfrastructure/TestExtensions/XunitLoggerProvider.cs +++ b/test/TestInfrastructure/TestExtensions/XunitLoggerProvider.cs @@ -5,6 +5,8 @@ namespace TestExtensions { public class XunitLoggerProvider : ILoggerProvider { + private const string NoActiveTestExceptionMessage = "There is no currently active test."; + private readonly ITestOutputHelper output; public XunitLoggerProvider(ITestOutputHelper output) @@ -37,7 +39,15 @@ public void Dispose() { } public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) { - this.output.WriteLine($"{logLevel} [{this.category}.{eventId.Name ?? eventId.Id.ToString()}] {formatter(state, exception)}"); + var message = $"{logLevel} [{this.category}.{eventId.Name ?? eventId.Id.ToString()}] {formatter(state, exception)}"; + try + { + this.output.WriteLine(message); + } + catch (InvalidOperationException invalidOperationException) when (invalidOperationException.Message == NoActiveTestExceptionMessage) + { + Console.Error.WriteLine(message); + } } } }