diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index fa261ae209c..a57f35b8102 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -24,7 +24,6 @@ internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ILifec private readonly IClusterMembershipService clusterMembershipService; private readonly IInternalGrainFactory grainFactory; private readonly ActivationDirectory localActivations; - private readonly InsideRuntimeClient runtimeClient; private readonly IServiceProvider _serviceProvider; private readonly CancellationTokenSource _membershipUpdatesCancellation = new(); private DirectoryMembership directoryMembership = DirectoryMembership.Default; @@ -71,7 +70,6 @@ public LocalGrainDirectory( this.clusterMembershipService = clusterMembershipService; this.grainFactory = grainFactory; this.localActivations = systemTargetShared.ActivationDirectory; - this.runtimeClient = systemTargetShared.RuntimeClient; DirectoryCache = GrainDirectoryCacheFactory.CreateGrainDirectoryCache(serviceProvider, grainDirectoryOptions.Value, out this.disposeDirectoryCache); @@ -319,17 +317,6 @@ private List GetMembershipDifference( return result; } - private Task RefreshMembershipIfNewer(GrainAddress address, GrainAddress? previousAddress = null) - { - var targetVersion = address.MembershipVersion; - if (previousAddress is not null && previousAddress.MembershipVersion > targetVersion) - { - targetVersion = previousAddress.MembershipVersion; - } - - return RefreshMembershipIfNewer(targetVersion); - } - private Task RefreshMembershipIfNewer(List addresses) { var targetVersion = MembershipVersion.MinValue; @@ -366,11 +353,6 @@ private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddr return; } - if (status == SiloStatus.Dead) - { - runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo); - } - var activationsToShutdown = new List(); var resolver = grainDirectoryResolver ??= _serviceProvider.GetRequiredService(); foreach (var activation in localActivations) @@ -460,24 +442,14 @@ private void AdjustLocalCache(ClusterMembershipSnapshot snapshot, DirectoryMembe } } - private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot) + internal static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot) { if (address.SiloAddress is not { } silo) { return true; } - if (snapshot.Members.TryGetValue(silo, out var member)) - { - // If this is a known host, remove the activation if the host is dead. - return member.Status == SiloStatus.Dead; - } - - // If this is not a known host, remove the activation if it was registered at an older membership version. - // This indicates that the host must have been removed. - // Hosts cannot activate grains before they are active, and we ensure that we refresh the membership before processing messages, - // so this is a reliable indicator of a defunct activation. - return address.MembershipVersion < snapshot.Version; + return snapshot.GetSiloStatus(silo) == SiloStatus.Dead; } internal SiloAddress? FindPredecessor(SiloAddress silo) @@ -624,13 +596,13 @@ public async Task RegisterAsync(GrainAddress address, GrainAddres DirectoryInstruments.RegistrationsSingleActIssued.Add(1); } - await RefreshMembershipIfNewer(address, previousAddress); + await RefreshMembershipIfNewer(address.MembershipVersion); // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // After the first forward, we insert a retry delay and recheck owner before forwarding again + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync"); @@ -680,7 +652,7 @@ public async Task UnregisterAfterNonexistingActivation(GrainAddress addr, SiloAd { LogTraceUnregisterAfterNonexistingActivation(addr, origin); - await RefreshMembershipIfNewer(addr); + await RefreshMembershipIfNewer(addr.MembershipVersion); if (origin == null || this.directoryMembership.MembershipCache.Contains(origin)) { @@ -709,13 +681,13 @@ public async Task UnregisterAsync(GrainAddress address, UnregistrationCause caus if (hopCount == 0) InvalidateCacheEntry(address); - await RefreshMembershipIfNewer(address); + await RefreshMembershipIfNewer(address.MembershipVersion); // see if the owner is somewhere else (returns null if we are owner) var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync"); - // on all silos other than first, we insert a retry delay and recheck owner before forwarding - if (hopCount > 0 && forwardAddress != null) + // After the first forward, we insert a retry delay and recheck owner before forwarding again + if (hopCount > 1 && forwardAddress != null) { await Task.Delay(RETRY_DELAY); forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync"); diff --git a/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs b/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs index c276aaf0adf..4eb336b1415 100644 --- a/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs +++ b/src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs @@ -9,15 +9,18 @@ internal partial class SiloConnectionMaintainer : ILifecycleParticipant log; public SiloConnectionMaintainer( ConnectionManager connectionManager, ISiloStatusOracle siloStatusOracle, + IRuntimeClient runtimeClient, ILogger log) { this.connectionManager = connectionManager; this.siloStatusOracle = siloStatusOracle; + this.runtimeClient = runtimeClient; this.log = log; } @@ -42,6 +45,7 @@ public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus sta { if (status == SiloStatus.Dead && updatedSilo != siloStatusOracle.SiloAddress) { + this.runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo); _ = Task.Run(() => this.CloseConnectionAsync(updatedSilo)); } } @@ -68,4 +72,4 @@ private async Task CloseConnectionAsync(SiloAddress silo) )] private static partial void LogExceptionWhileClosingConnections(ILogger logger, SiloAddress siloAddress, Exception exception); } -} \ No newline at end of file +} diff --git a/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs new file mode 100644 index 00000000000..cc21b2e2ab9 --- /dev/null +++ b/test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs @@ -0,0 +1,71 @@ +using System.Collections.Immutable; +using System.Net; +using Orleans.Runtime; +using Orleans.Runtime.GrainDirectory; +using Xunit; + +namespace UnitTests; + +[TestCategory("BVT"), TestCategory("GrainDirectory")] +public class LocalGrainDirectoryTests +{ + [Theory] + [InlineData(SiloStatus.Active)] + [InlineData(SiloStatus.ShuttingDown)] + [InlineData(SiloStatus.Stopping)] + public void IsDefunctActivation_DoesNotRemoveNonDeadSilos(SiloStatus status) + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo); + var snapshot = CreateSnapshot(new ClusterMember(silo, status, "silo"), version: 2); + + Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_RemovesDeadSilos() + { + var silo = CreateSiloAddress(1); + var address = CreateGrainAddress(silo); + var snapshot = CreateSnapshot(new ClusterMember(silo, SiloStatus.Dead, "silo"), version: 2); + + Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_DoesNotRemoveUnknownSiloUntilKnownDead() + { + var silo = CreateSiloAddress(1); + var unrelatedSilo = CreateSiloAddress(1, port: 11112); + var address = CreateGrainAddress(silo, membershipVersion: 1); + var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2); + + Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + [Fact] + public void IsDefunctActivation_RemovesSiloReplacedBySuccessor() + { + var silo = CreateSiloAddress(1); + var successor = CreateSiloAddress(2); + var address = CreateGrainAddress(silo, membershipVersion: 1); + var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2); + + Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot)); + } + + private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version) + => new(ImmutableDictionary.Empty.Add(member.SiloAddress, member), new MembershipVersion(version)); + + private static GrainAddress CreateGrainAddress(SiloAddress siloAddress, long membershipVersion = 1) + => new() + { + GrainId = GrainId.Create("test-grain", Guid.NewGuid().ToString("N")), + ActivationId = ActivationId.NewId(), + SiloAddress = siloAddress, + MembershipVersion = new MembershipVersion(membershipVersion) + }; + + private static SiloAddress CreateSiloAddress(int generation, int port = 11111) + => SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), generation); +} diff --git a/test/TestInfrastructure/TestExtensions/XunitLoggerProvider.cs b/test/TestInfrastructure/TestExtensions/XunitLoggerProvider.cs index 17a0539b9b4..fb69e243652 100644 --- a/test/TestInfrastructure/TestExtensions/XunitLoggerProvider.cs +++ b/test/TestInfrastructure/TestExtensions/XunitLoggerProvider.cs @@ -5,6 +5,8 @@ namespace TestExtensions { public class XunitLoggerProvider : ILoggerProvider { + private const string NoActiveTestExceptionMessage = "There is no currently active test."; + private readonly ITestOutputHelper output; public XunitLoggerProvider(ITestOutputHelper output) @@ -37,7 +39,15 @@ public void Dispose() { } public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) { - this.output.WriteLine($"{logLevel} [{this.category}.{eventId.Name ?? eventId.Id.ToString()}] {formatter(state, exception)}"); + var message = $"{logLevel} [{this.category}.{eventId.Name ?? eventId.Id.ToString()}] {formatter(state, exception)}"; + try + { + this.output.WriteLine(message); + } + catch (InvalidOperationException invalidOperationException) when (invalidOperationException.Message == NoActiveTestExceptionMessage) + { + Console.Error.WriteLine(message); + } } } }