Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions src/Orleans.Runtime/GrainDirectory/CachedGrainLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ private async Task ListenToClusterChange()
var updates = this.clusterMembershipService.MembershipUpdates.WithCancellation(this.shutdownToken.Token);
await foreach (var snapshot in updates)
{
// Active filtering: detect silos that went down and try to clean proactively the directory
// Active filtering: detect dead silos and try to clean proactively the directory
var changes = snapshot.CreateUpdate(previousSnapshot).Changes;
var deadSilos = changes
.Where(member => member.Status.IsTerminating())
.Where(member => member.Status == SiloStatus.Dead)
.Select(member => member.SiloAddress)
.ToList();

Expand All @@ -187,6 +187,7 @@ private async Task ListenToClusterChange()
}

((ITestAccessor)this).LastMembershipVersion = snapshot.Version;
previousSnapshot = snapshot;
}
}

Expand All @@ -196,22 +197,16 @@ private bool IsKnownDeadSilo(GrainAddress grainAddress)
private bool IsKnownDeadSilo(SiloAddress siloAddress, MembershipVersion membershipVersion)
{
var current = this.clusterMembershipService.CurrentSnapshot;

// Check if the target silo is in the cluster
if (current.Members.TryGetValue(siloAddress, out var value))
{
// It is, check if it's alive
return value.Status.IsTerminating();
}

// We didn't find it in the cluster. If the silo entry is too old, it has been cleaned in the membership table: the entry isn't valid anymore.
// Otherwise, maybe the membership service isn't up to date yet. The entry should be valid
return current.Version > membershipVersion;
return siloAddress is null || current.GetSiloStatus(siloAddress, membershipVersion) == SiloStatus.Dead;
}

private static void ThrowUnsupportedGrainType(GrainId grainId) => throw new InvalidOperationException($"Unsupported grain type for grain {grainId}");

public void UpdateCache(GrainId grainId, SiloAddress siloAddress) => cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0);
public void UpdateCache(GrainId grainId, SiloAddress siloAddress)
{
var membershipVersion = this.clusterMembershipService.CurrentSnapshot.Version;
cache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress, MembershipVersion = membershipVersion }, (int)membershipVersion.Value);
}
public void InvalidateCache(GrainId grainId) => cache.Remove(grainId);
public void InvalidateCache(GrainAddress address) => cache.Remove(address);
public bool TryLookupInCache(GrainId grainId, out GrainAddress address)
Expand All @@ -222,10 +217,10 @@ public bool TryLookupInCache(GrainId grainId, out GrainAddress address)
ThrowUnsupportedGrainType(grainId);
}

if (this.cache.LookUp(grainId, out address, out var version))
if (this.cache.LookUp(grainId, out address, out _))
{
// If the silo is dead, remove the entry
if (IsKnownDeadSilo(address.SiloAddress, new MembershipVersion(version)))
if (IsKnownDeadSilo(address))
{
address = default;
this.cache.Remove(grainId);
Expand Down
19 changes: 15 additions & 4 deletions src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@ internal sealed partial class GrainDirectoryHandoffManager
private static readonly TimeSpan RetryDelay = TimeSpan.FromMilliseconds(250);
private readonly LocalGrainDirectory localDirectory;
private readonly ISiloStatusOracle siloStatusOracle;
private readonly IClusterMembershipService clusterMembershipService;
private readonly IInternalGrainFactory grainFactory;
private readonly ILogger logger;
private readonly Factory<LocalGrainDirectoryPartition> createPartion;
private readonly Queue<(string name, object state, Func<GrainDirectoryHandoffManager, object, Task> action)> pendingOperations = new();
private readonly AsyncLock executorLock = new AsyncLock();

internal GrainDirectoryHandoffManager(
LocalGrainDirectory localDirectory,
ISiloStatusOracle siloStatusOracle,
IClusterMembershipService clusterMembershipService,
IInternalGrainFactory grainFactory,
Factory<LocalGrainDirectoryPartition> createPartion,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<GrainDirectoryHandoffManager>();
this.localDirectory = localDirectory;
this.siloStatusOracle = siloStatusOracle;
this.clusterMembershipService = clusterMembershipService;
this.grainFactory = grainFactory;
this.createPartion = createPartion;
}

internal void ProcessSiloAddEvent(SiloAddress addedSilo)
Expand Down Expand Up @@ -128,9 +128,10 @@ private async Task AcceptExistingRegistrationsAsync(List<GrainAddress> singleAct
{
if (!this.localDirectory.Running) return;

var snapshot = this.clusterMembershipService.CurrentSnapshot;
for (var i = singleActivations.Count - 1; i >= 0; i--)
{
if (singleActivations[i].SiloAddress is not { } siloAddress || this.siloStatusOracle.GetApproximateSiloStatus(siloAddress) == SiloStatus.Dead)
if (!IsTransferableRegistration(singleActivations[i], snapshot))
{
singleActivations.RemoveAt(i);
}
Expand Down Expand Up @@ -199,6 +200,16 @@ private async Task DestroyDuplicateActivationsAsync(Dictionary<SiloAddress, List
}
}

internal static bool IsTransferableRegistration(GrainAddress address, ClusterMembershipSnapshot snapshot)
{
if (address.SiloAddress is not { } silo)
{
return false;
}

return snapshot.GetSiloStatus(silo, address.MembershipVersion) != SiloStatus.Dead;
}

private void EnqueueOperation(string name, object state, Func<GrainDirectoryHandoffManager, object, Task> action)
{
lock (this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ private GrainAddress RegisterCore(GrainAddress newAddress, GrainAddress? existin
return existing;
}

private bool IsSiloDead(GrainAddress existing) => _owner.ClusterMembershipSnapshot.GetSiloStatus(existing.SiloAddress) == SiloStatus.Dead;
private bool IsSiloDead(GrainAddress existing)
=> existing.SiloAddress is null || _owner.ClusterMembershipSnapshot.GetSiloStatus(existing.SiloAddress, existing.MembershipVersion) == SiloStatus.Dead;

[LoggerMessage(
Level = LogLevel.Trace,
Expand Down
62 changes: 21 additions & 41 deletions src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ILifec
private readonly IClusterMembershipService clusterMembershipService;
private readonly IInternalGrainFactory grainFactory;
private readonly ActivationDirectory localActivations;
private readonly InsideRuntimeClient runtimeClient;
private readonly IServiceProvider _serviceProvider;
private readonly CancellationTokenSource _membershipUpdatesCancellation = new();
private DirectoryMembership directoryMembership = DirectoryMembership.Default;
Expand Down Expand Up @@ -71,7 +70,6 @@ public LocalGrainDirectory(
this.clusterMembershipService = clusterMembershipService;
this.grainFactory = grainFactory;
this.localActivations = systemTargetShared.ActivationDirectory;
this.runtimeClient = systemTargetShared.RuntimeClient;

DirectoryCache = GrainDirectoryCacheFactory.CreateGrainDirectoryCache(serviceProvider, grainDirectoryOptions.Value, out this.disposeDirectoryCache);

Expand All @@ -82,7 +80,7 @@ public LocalGrainDirectory(
}

DirectoryPartition = grainDirectoryPartitionFactory();
HandoffManager = new GrainDirectoryHandoffManager(this, siloStatusOracle, grainFactory, grainDirectoryPartitionFactory, loggerFactory);
HandoffManager = new GrainDirectoryHandoffManager(this, siloStatusOracle, clusterMembershipService, grainFactory, loggerFactory);

// When DistributedGrainDirectory is active, it registers its own IRemoteGrainDirectory system targets.
// In that case, create the RemoteGrainDirectory objects (still needed for WorkItemGroup scheduling)
Expand Down Expand Up @@ -319,17 +317,6 @@ private List<SiloAddress> GetMembershipDifference(
return result;
}

private Task RefreshMembershipIfNewer(GrainAddress address, GrainAddress? previousAddress = null)
{
var targetVersion = address.MembershipVersion;
if (previousAddress is not null && previousAddress.MembershipVersion > targetVersion)
{
targetVersion = previousAddress.MembershipVersion;
}

return RefreshMembershipIfNewer(targetVersion);
}

private Task RefreshMembershipIfNewer(List<GrainAddress> addresses)
{
var targetVersion = MembershipVersion.MinValue;
Expand Down Expand Up @@ -366,11 +353,6 @@ private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddr
return;
}

if (status == SiloStatus.Dead)
{
runtimeClient.BreakOutstandingMessagesToSilo(updatedSilo);
}

var activationsToShutdown = new List<IGrainContext>();
var resolver = grainDirectoryResolver ??= _serviceProvider.GetRequiredService<GrainDirectoryResolver>();
foreach (var activation in localActivations)
Expand Down Expand Up @@ -460,24 +442,14 @@ private void AdjustLocalCache(ClusterMembershipSnapshot snapshot, DirectoryMembe
}
}

private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot)
internal static bool IsDefunctActivation(GrainAddress address, ClusterMembershipSnapshot snapshot)
{
if (address.SiloAddress is not { } silo)
{
return true;
}

if (snapshot.Members.TryGetValue(silo, out var member))
{
// If this is a known host, remove the activation if the host is dead.
return member.Status == SiloStatus.Dead;
}

// If this is not a known host, remove the activation if it was registered at an older membership version.
// This indicates that the host must have been removed.
// Hosts cannot activate grains before they are active, and we ensure that we refresh the membership before processing messages,
// so this is a reliable indicator of a defunct activation.
return address.MembershipVersion < snapshot.Version;
return snapshot.GetSiloStatus(silo, address.MembershipVersion) == SiloStatus.Dead;
}

internal SiloAddress? FindPredecessor(SiloAddress silo)
Expand Down Expand Up @@ -624,13 +596,13 @@ public async Task<AddressAndTag> RegisterAsync(GrainAddress address, GrainAddres
DirectoryInstruments.RegistrationsSingleActIssued.Add(1);
}

await RefreshMembershipIfNewer(address, previousAddress);
await RefreshMembershipIfNewer(address.MembershipVersion);

// see if the owner is somewhere else (returns null if we are owner)
var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync");

// on all silos other than first, we insert a retry delay and recheck owner before forwarding
if (hopCount > 0 && forwardAddress != null)
// After the first forward, we insert a retry delay and recheck owner before forwarding again
if (hopCount > 1 && forwardAddress != null)
Comment thread
ReubenBond marked this conversation as resolved.
{
await Task.Delay(RETRY_DELAY);
forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "RegisterAsync");
Expand Down Expand Up @@ -667,7 +639,7 @@ public async Task<AddressAndTag> RegisterAsync(GrainAddress address, GrainAddres
// this way next local lookup will find this ActivationAddress in the cache and we will save a full lookup!
if (result.Address == null) return result;

if (!address.Equals(result.Address) || !IsValidSilo(address.SiloAddress)) return result;
if (!address.Equals(result.Address) || IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot)) return result;

// update the cache so next local lookup will find this ActivationAddress in the cache and we will save full lookup.
DirectoryCache.AddOrUpdate(result.Address, result.VersionTag);
Expand All @@ -680,7 +652,7 @@ public async Task UnregisterAfterNonexistingActivation(GrainAddress addr, SiloAd
{
LogTraceUnregisterAfterNonexistingActivation(addr, origin);

await RefreshMembershipIfNewer(addr);
await RefreshMembershipIfNewer(addr.MembershipVersion);

if (origin == null || this.directoryMembership.MembershipCache.Contains(origin))
{
Expand Down Expand Up @@ -709,13 +681,13 @@ public async Task UnregisterAsync(GrainAddress address, UnregistrationCause caus
if (hopCount == 0)
InvalidateCacheEntry(address);

await RefreshMembershipIfNewer(address);
await RefreshMembershipIfNewer(address.MembershipVersion);

// see if the owner is somewhere else (returns null if we are owner)
var forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync");

// on all silos other than first, we insert a retry delay and recheck owner before forwarding
if (hopCount > 0 && forwardAddress != null)
// After the first forward, we insert a retry delay and recheck owner before forwarding again
if (hopCount > 1 && forwardAddress != null)
{
await Task.Delay(RETRY_DELAY);
forwardAddress = this.CheckIfShouldForward(address.GrainId, hopCount, "UnregisterAsync");
Expand Down Expand Up @@ -872,7 +844,15 @@ public bool LocalLookup(GrainId grain, out AddressAndTag result)

public AddressAndTag GetLocalDirectoryData(GrainId grain) => DirectoryPartition.LookUpActivation(grain);

public GrainAddress? GetLocalCacheData(GrainId grain) => DirectoryCache.LookUp(grain, out var cache) && IsValidSilo(cache.SiloAddress) ? cache : null;
public GrainAddress? GetLocalCacheData(GrainId grain)
{
if (!DirectoryCache.LookUp(grain, out var cache))
{
return null;
}

return IsDefunctActivation(cache, clusterMembershipService.CurrentSnapshot) ? null : cache;
}

public async Task<AddressAndTag> LookupAsync(GrainId grainId, int hopCount = 0)
{
Expand Down Expand Up @@ -933,7 +913,7 @@ public async Task<AddressAndTag> LookupAsync(GrainId grainId, int hopCount = 0)
var result = await GetDirectoryReference(forwardAddress).LookupAsync(grainId, hopCount + 1);

// update the cache
if (result.Address is { } address && IsValidSilo(address.SiloAddress))
if (result.Address is { } address && !IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot))
{
DirectoryCache.AddOrUpdate(address, result.VersionTag);
}
Expand Down
23 changes: 12 additions & 11 deletions src/Orleans.Runtime/GrainDirectory/LocalGrainDirectoryPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,22 @@ internal sealed partial class LocalGrainDirectoryPartition
private Dictionary<GrainId, GrainInfo> partitionData;
private readonly object lockable;
private readonly ILogger log;
private readonly ISiloStatusOracle siloStatusOracle;
private readonly IClusterMembershipService clusterMembershipService;
private readonly IOptions<GrainDirectoryOptions> grainDirectoryOptions;

internal int Count { get { return partitionData.Count; } }

public LocalGrainDirectoryPartition(ISiloStatusOracle siloStatusOracle, IOptions<GrainDirectoryOptions> grainDirectoryOptions, ILoggerFactory loggerFactory)
public LocalGrainDirectoryPartition(IClusterMembershipService clusterMembershipService, IOptions<GrainDirectoryOptions> grainDirectoryOptions, ILoggerFactory loggerFactory)
{
partitionData = new Dictionary<GrainId, GrainInfo>();
lockable = new object();
log = loggerFactory.CreateLogger<LocalGrainDirectoryPartition>();
this.siloStatusOracle = siloStatusOracle;
this.clusterMembershipService = clusterMembershipService;
this.grainDirectoryOptions = grainDirectoryOptions;
}

private bool IsValidSilo(SiloAddress? silo) => silo is not null && siloStatusOracle.IsFunctionalDirectory(silo);
private bool IsDefunctActivation(GrainAddress? address)
=> address is null || LocalGrainDirectory.IsDefunctActivation(address, clusterMembershipService.CurrentSnapshot);

internal void Clear()
{
Expand Down Expand Up @@ -156,9 +157,11 @@ internal AddressAndTag AddSingleActivation(GrainAddress address, GrainAddress? p
{
LogTraceAddingSingleActivation(address.SiloAddress, address.GrainId, address.ActivationId);

if (!IsValidSilo(address.SiloAddress))
if (IsDefunctActivation(address))
{
var siloStatus = this.siloStatusOracle.GetApproximateSiloStatus(address.SiloAddress);
var siloStatus = address.SiloAddress is { } siloAddress
? this.clusterMembershipService.CurrentSnapshot.GetSiloStatus(siloAddress, address.MembershipVersion)
: SiloStatus.None;
throw new OrleansException($"Trying to register {address.GrainId} on invalid silo: {address.SiloAddress}. Known status: {siloStatus}");
}

Expand All @@ -170,10 +173,8 @@ internal AddressAndTag AddSingleActivation(GrainAddress address, GrainAddress? p
}
else
{
var siloAddress = grainInfo.Activation?.SiloAddress;

// If there is an existing entry pointing to an invalid silo then remove it
if (siloAddress != null && !IsValidSilo(siloAddress))
if (IsDefunctActivation(grainInfo.Activation))
{
partitionData[address.GrainId] = grainInfo = new GrainInfo();
}
Expand Down Expand Up @@ -230,7 +231,7 @@ internal AddressAndTag LookUpActivation(GrainId grain)
result = new(grainInfo.Activation, grainInfo.VersionTag);
}

if (!IsValidSilo(result.Address?.SiloAddress))
if (IsDefunctActivation(result.Address))
{
result = new(null, result.VersionTag);
}
Expand Down Expand Up @@ -308,7 +309,7 @@ internal List<GrainAddress> Split(Predicate<GrainId> predicate)

for (var i = result.Count - 1; i >= 0; i--)
{
if (!IsValidSilo(result[i].SiloAddress))
if (IsDefunctActivation(result[i]))
{
result.RemoveAt(i);
}
Expand Down
Loading