Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 9 additions & 37 deletions src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ILifec
private readonly IClusterMembershipService clusterMembershipService;
private readonly IInternalGrainFactory grainFactory;
private readonly ActivationDirectory localActivations;
private readonly InsideRuntimeClient runtimeClient;
private readonly IServiceProvider _serviceProvider;
private readonly CancellationTokenSource _membershipUpdatesCancellation = new();
private DirectoryMembership directoryMembership = DirectoryMembership.Default;
Expand Down Expand Up @@ -71,7 +70,6 @@ public LocalGrainDirectory(
this.clusterMembershipService = clusterMembershipService;
this.grainFactory = grainFactory;
this.localActivations = systemTargetShared.ActivationDirectory;
this.runtimeClient = systemTargetShared.RuntimeClient;

DirectoryCache = GrainDirectoryCacheFactory.CreateGrainDirectoryCache(serviceProvider, grainDirectoryOptions.Value, out this.disposeDirectoryCache);

Expand Down Expand Up @@ -319,17 +317,6 @@ private List<SiloAddress> GetMembershipDifference(
return result;
}

private Task RefreshMembershipIfNewer(GrainAddress address, GrainAddress? previousAddress = null)
{
var targetVersion = address.MembershipVersion;
if (previousAddress is not null && previousAddress.MembershipVersion > targetVersion)
{
targetVersion = previousAddress.MembershipVersion;
}

return RefreshMembershipIfNewer(targetVersion);
}

private Task RefreshMembershipIfNewer(List<GrainAddress> addresses)
{
var targetVersion = MembershipVersion.MinValue;
Expand Down Expand Up @@ -366,11 +353,6 @@ private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddr
return;
}

if (status == SiloStatus.Dead)
{
runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo);
}

var activationsToShutdown = new List<IGrainContext>();
var resolver = grainDirectoryResolver ??= _serviceProvider.GetRequiredService<GrainDirectoryResolver>();
foreach (var activation in localActivations)
Expand Down Expand Up @@ -460,24 +442,14 @@ private void AdjustLocalCache(ClusterMembershipSnapshot snapshot, DirectoryMembe
}
}

private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot)
internal static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot)
{
if (address.SiloAddress is not { } silo)
{
return true;
}

if (snapshot.Members.TryGetValue(silo, out var member))
{
// If this is a known host, remove the activation if the host is dead.
return member.Status == SiloStatus.Dead;
}

// If this is not a known host, remove the activation if it was registered at an older membership version.
// This indicates that the host must have been removed.
// Hosts cannot activate grains before they are active, and we ensure that we refresh the membership before processing messages,
// so this is a reliable indicator of a defunct activation.
return address.MembershipVersion < snapshot.Version;
return snapshot.GetSiloStatus(silo) == SiloStatus.Dead;
}

internal SiloAddress? FindPredecessor(SiloAddress silo)
Expand Down Expand Up @@ -624,13 +596,13 @@ public async Task<AddressAndTag> RegisterAsync(GrainAddress address, GrainAddres
DirectoryInstruments.RegistrationsSingleActIssued.Add(1);
}

await RefreshMembershipIfNewer(address, previousAddress);
await RefreshMembershipIfNewer(address.MembershipVersion);

// see if the owner is somewhere else (returns null if we are owner)
var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync");

// on all silos other than first, we insert a retry delay and recheck owner before forwarding
if (hopCount > 0 && forwardAddress != null)
// After the first forward, we insert a retry delay and recheck owner before forwarding again
if (hopCount > 1 && forwardAddress != null)
Comment thread
ReubenBond marked this conversation as resolved.
{
await Task.Delay(RETRY_DELAY);
forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync");
Expand Down Expand Up @@ -680,7 +652,7 @@ public async Task UnregisterAfterNonexistingActivation(GrainAddress addr, SiloAd
{
LogTraceUnregisterAfterNonexistingActivation(addr, origin);

await RefreshMembershipIfNewer(addr);
await RefreshMembershipIfNewer(addr.MembershipVersion);

if (origin == null || this.directoryMembership.MembershipCache.Contains(origin))
{
Expand Down Expand Up @@ -709,13 +681,13 @@ public async Task UnregisterAsync(GrainAddress address, UnregistrationCause caus
if (hopCount == 0)
InvalidateCacheEntry(address);

await RefreshMembershipIfNewer(address);
await RefreshMembershipIfNewer(address.MembershipVersion);

// see if the owner is somewhere else (returns null if we are owner)
var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync");

// on all silos other than first, we insert a retry delay and recheck owner before forwarding
if (hopCount > 0 && forwardAddress != null)
// After the first forward, we insert a retry delay and recheck owner before forwarding again
if (hopCount > 1 && forwardAddress != null)
{
await Task.Delay(RETRY_DELAY);
forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync");
Expand Down
6 changes: 5 additions & 1 deletion src/Orleans.Runtime/Networking/SiloConnectionMaintainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ internal partial class SiloConnectionMaintainer : ILifecycleParticipant<ISiloLif
{
private readonly ConnectionManager connectionManager;
private readonly ISiloStatusOracle siloStatusOracle;
private readonly IRuntimeClient runtimeClient;
private readonly ILogger<SiloConnectionMaintainer> log;

public SiloConnectionMaintainer(
ConnectionManager connectionManager,
ISiloStatusOracle siloStatusOracle,
IRuntimeClient runtimeClient,
ILogger<SiloConnectionMaintainer> log)
{
this.connectionManager = connectionManager;
this.siloStatusOracle = siloStatusOracle;
this.runtimeClient = runtimeClient;
this.log = log;
}

Expand All @@ -42,6 +45,7 @@ public void SiloStatusChangeNotification(SiloAddress updatedSilo, SiloStatus sta
{
if (status == SiloStatus.Dead && updatedSilo != siloStatusOracle.SiloAddress)
{
this.runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo);
_ = Task.Run(() => this.CloseConnectionAsync(updatedSilo));
}
}
Expand All @@ -68,4 +72,4 @@ private async Task CloseConnectionAsync(SiloAddress silo)
)]
private static partial void LogExceptionWhileClosingConnections(ILogger logger, SiloAddress siloAddress, Exception exception);
}
}
}
71 changes: 71 additions & 0 deletions test/Orleans.Runtime.Internal.Tests/LocalGrainDirectoryTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System.Collections.Immutable;
using System.Net;
using Orleans.Runtime;
using Orleans.Runtime.GrainDirectory;
using Xunit;

namespace UnitTests;

[TestCategory("BVT"), TestCategory("GrainDirectory")]
public class LocalGrainDirectoryTests
{
[Theory]
[InlineData(SiloStatus.Active)]
[InlineData(SiloStatus.ShuttingDown)]
[InlineData(SiloStatus.Stopping)]
public void IsDefunctActivation_DoesNotRemoveNonDeadSilos(SiloStatus status)
{
var silo = CreateSiloAddress(1);
var address = CreateGrainAddress(silo);
var snapshot = CreateSnapshot(new ClusterMember(silo, status, "silo"), version: 2);

Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot));
}

[Fact]
public void IsDefunctActivation_RemovesDeadSilos()
{
var silo = CreateSiloAddress(1);
var address = CreateGrainAddress(silo);
var snapshot = CreateSnapshot(new ClusterMember(silo, SiloStatus.Dead, "silo"), version: 2);

Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot));
}

[Fact]
public void IsDefunctActivation_DoesNotRemoveUnknownSiloUntilKnownDead()
{
var silo = CreateSiloAddress(1);
var unrelatedSilo = CreateSiloAddress(1, port: 11112);
var address = CreateGrainAddress(silo, membershipVersion: 1);
var snapshot = CreateSnapshot(new ClusterMember(unrelatedSilo, SiloStatus.Active, "other"), version: 2);

Assert.False(LocalGrainDirectory.IsDefunctActivation(address, snapshot));
}

[Fact]
public void IsDefunctActivation_RemovesSiloReplacedBySuccessor()
{
var silo = CreateSiloAddress(1);
var successor = CreateSiloAddress(2);
var address = CreateGrainAddress(silo, membershipVersion: 1);
var snapshot = CreateSnapshot(new ClusterMember(successor, SiloStatus.Active, "silo"), version: 2);

Assert.True(LocalGrainDirectory.IsDefunctActivation(address, snapshot));
}

private static ClusterMembershipSnapshot CreateSnapshot(ClusterMember member, long version)
=> new(ImmutableDictionary<SiloAddress, ClusterMember>.Empty.Add(member.SiloAddress, member), new MembershipVersion(version));

private static GrainAddress CreateGrainAddress(SiloAddress siloAddress, long membershipVersion = 1)
=> new()
{
GrainId = GrainId.Create("test-grain", Guid.NewGuid().ToString("N")),
ActivationId = ActivationId.NewId(),
SiloAddress = siloAddress,
MembershipVersion = new MembershipVersion(membershipVersion)
};

private static SiloAddress CreateSiloAddress(int generation, int port = 11111)
=> SiloAddress.New(new IPEndPoint(IPAddress.Loopback, port), generation);
}
12 changes: 11 additions & 1 deletion test/TestInfrastructure/TestExtensions/XunitLoggerProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ namespace TestExtensions
{
public class XunitLoggerProvider : ILoggerProvider
{
private const string NoActiveTestExceptionMessage = "There is no currently active test.";

private readonly ITestOutputHelper output;

public XunitLoggerProvider(ITestOutputHelper output)
Expand Down Expand Up @@ -37,7 +39,15 @@ public void Dispose() { }

public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func<TState, Exception, string> formatter)
{
this.output.WriteLine($"{logLevel} [{this.category}.{eventId.Name ?? eventId.Id.ToString()}] {formatter(state, exception)}");
var message = $"{logLevel} [{this.category}.{eventId.Name ?? eventId.Id.ToString()}] {formatter(state, exception)}";
try
{
this.output.WriteLine(message);
}
catch (InvalidOperationException invalidOperationException) when (invalidOperationException.Message == NoActiveTestExceptionMessage)
{
Console.Error.WriteLine(message);
}
}
}
}
Expand Down
Loading