Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,17 @@ private async Task ExecutePendingOperations()

LogWarningOperationFailedRetry(logger, exception, op.Name);
await Task.Delay(RetryDelay);
if (!this.localDirectory.Running)
{
return;
}

// Keep retrying failed handoff work, but let later queued operations make progress.
lock (this)
{
this.pendingOperations.Dequeue();
this.pendingOperations.Enqueue(op);
}
}
}
}
Expand Down
7 changes: 0 additions & 7 deletions src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,5 @@ internal interface ILocalGrainDirectory : IDhtGrainDirectory
/// Attempts to find the specified grain in the directory cache.
/// </summary>
bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address);

/// <summary>
/// For determining message forwarding logic, we sometimes check if a silo is part of this cluster or not
/// </summary>
/// <param name="silo">the address of the silo</param>
/// <returns>true if the silo is known to be part of this cluster</returns>
bool IsSiloInCluster(SiloAddress silo);
}
}
124 changes: 57 additions & 67 deletions src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ public LocalGrainDirectory(

DirectoryInstruments.RegisterDirectoryPartitionSizeObserve(() => DirectoryPartition.Count);
DirectoryInstruments.RegisterMyPortionRingDistanceObserve(() => RingDistanceToSuccessor());
DirectoryInstruments.RegisterMyPortionRingPercentageObserve(() => (((float)this.RingDistanceToSuccessor()) / ((float)(int.MaxValue * 2L))) * 100);
DirectoryInstruments.RegisterMyPortionRingPercentageObserve(() => this.RingDistanceToSuccessor() / (float)(int.MaxValue * 2L) * 100);
DirectoryInstruments.RegisterMyPortionAverageRingPercentageObserve(() =>
{
var ring = this.directoryMembership.MembershipRingList;
return ring.Count == 0 ? 0 : ((float)100 / (float)ring.Count);
return ring.Count == 0 ? 0 : (100 / (float)ring.Count);
});
DirectoryInstruments.RegisterRingSizeObserve(() => this.directoryMembership.MembershipRingList.Count);
_serviceProvider = serviceProvider;
Expand Down Expand Up @@ -173,17 +173,16 @@ public async Task StopAsync()

private async Task ProcessMembershipUpdates(CancellationToken cancellationToken)
{
var snapshot = clusterMembershipService.CurrentSnapshot;
while (!cancellationToken.IsCancellationRequested)
{
try
{
await ApplyMembershipSnapshot(snapshot);
await ApplyMembershipSnapshot();

await foreach (var update in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken))
await foreach (var _ in clusterMembershipService.MembershipUpdates.WithCancellation(cancellationToken))
{
snapshot = update;
await ApplyMembershipSnapshot(snapshot);
// Always apply the latest snapshot.
await ApplyMembershipSnapshot();
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
Expand All @@ -207,58 +206,58 @@ private async Task ProcessMembershipUpdates(CancellationToken cancellationToken)
LogWarningRefreshingClusterMembershipFailed(refreshException);
}

snapshot = clusterMembershipService.CurrentSnapshot;
await Task.Delay(RETRY_DELAY, cancellationToken).SuppressThrowing();
}
}
}

private Task ApplyMembershipSnapshot(ClusterMembershipSnapshot snapshot)
private Task ApplyMembershipSnapshot()
{
return CacheValidator.RunOrQueueTask(() =>
{
ApplyMembershipSnapshotCore(snapshot);
ApplyMembershipSnapshotCore();
return Task.CompletedTask;
});
}

private void ApplyMembershipSnapshotCore(ClusterMembershipSnapshot snapshot)
{
if (!Running)
void ApplyMembershipSnapshotCore()
{
return;
}
if (!Running)
{
return;
}

var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default;
if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version)
{
return;
}
var snapshot = clusterMembershipService.CurrentSnapshot;
var previousSnapshot = hasAppliedClusterMembershipSnapshot ? appliedClusterMembershipSnapshot : ClusterMembershipSnapshot.Default;
if (hasAppliedClusterMembershipSnapshot && snapshot.Version <= previousSnapshot.Version)
{
return;
}

var previousMembership = CreateDirectoryMembership(previousSnapshot);
var targetMembership = CreateDirectoryMembership(snapshot);
this.directoryMembership = targetMembership;
var previousMembership = CreateDirectoryMembership(previousSnapshot);
var targetMembership = CreateDirectoryMembership(snapshot);
this.directoryMembership = targetMembership;

var removedSilos = GetMembershipDifference(previousMembership, targetMembership);
var addedSilos = GetMembershipDifference(targetMembership, previousMembership);
var removedSilos = GetMembershipDifference(previousMembership, targetMembership);
var addedSilos = GetMembershipDifference(targetMembership, previousMembership);

ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership);
AdjustLocalDirectory(snapshot);
AdjustLocalCache(snapshot, targetMembership);
ProcessSiloStatusChanges(snapshot, previousSnapshot, previousMembership);
AdjustLocalDirectory(snapshot);
AdjustLocalCache(snapshot, targetMembership);

foreach (var silo in removedSilos)
{
LogDebugSiloRemovedSilo(MyAddress, silo);
}
foreach (var silo in removedSilos)
{
LogDebugSiloRemovedSilo(MyAddress, silo);
}

foreach (var silo in addedSilos)
{
HandoffManager.ProcessSiloAddEvent(silo);
LogDebugSiloAddedSilo(MyAddress, silo);
}
foreach (var silo in addedSilos)
{
HandoffManager.ProcessSiloAddEvent(silo);
LogDebugSiloAddedSilo(MyAddress, silo);
}

appliedClusterMembershipSnapshot = snapshot;
hasAppliedClusterMembershipSnapshot = true;
appliedClusterMembershipSnapshot = snapshot;
hasAppliedClusterMembershipSnapshot = true;
}
}

private void ProcessSiloStatusChanges(
Expand Down Expand Up @@ -352,14 +351,12 @@ private async Task RefreshMembershipIfNewer(MembershipVersion targetVersion)
return;
}

var snapshot = clusterMembershipService.CurrentSnapshot;
if (targetVersion > snapshot.Version)
if (targetVersion > clusterMembershipService.CurrentSnapshot.Version)
{
await clusterMembershipService.Refresh(targetVersion);
snapshot = clusterMembershipService.CurrentSnapshot;
}

await ApplyMembershipSnapshot(snapshot);
await ApplyMembershipSnapshot();
}

private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddress updatedSilo, SiloStatus status)
Expand Down Expand Up @@ -406,9 +403,9 @@ private void OnSiloStatusChange(DirectoryMembership previousMembership, SiloAddr
return;
}

LogInfoSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo));
LogInfoSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo), status);

var reasonText = $"This activation is being deactivated due to a failure of server {updatedSilo}, since it was responsible for this activation's grain directory registration.";
var reasonText = $"This activation is being deactivated because server {updatedSilo} entered status {status} and was responsible for this activation's grain directory registration.";
var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText);
foreach (var activation in activationsToShutdown)
{
Expand Down Expand Up @@ -472,9 +469,14 @@ private static bool IsDefunctActivation(GrainAddress address, ClusterMembershipS

if (snapshot.Members.TryGetValue(silo, out var member))
{
// If this is a known host, remove the activation if the host is dead.
return member.Status == SiloStatus.Dead;
}

// If this is not a known host, remove the activation if it was registered at an older membership version.
// This indicates that the host must have been removed.
// Hosts cannot activate grains before they are active, and we ensure that we refresh the membership before processing messages,
// so this is a reliable indicator of a defunct activation.
return address.MembershipVersion < snapshot.Version;
}

Expand Down Expand Up @@ -763,7 +765,6 @@ private void UnregisterOrPutInForwardList(List<GrainAddress> addresses, Unregist
}
}


public async Task UnregisterManyAsync(List<GrainAddress> addresses, UnregistrationCause cause, int hopCount)
{
if (hopCount > 0)
Expand Down Expand Up @@ -1021,11 +1022,6 @@ private static int CompareSiloAddress(SiloAddress left, SiloAddress right)
return hashComparison != 0 ? hashComparison : left.CompareTo(right);
}

public bool IsSiloInCluster(SiloAddress silo)
{
return this.directoryMembership.MembershipCache.Contains(silo);
}

public void AddOrUpdateCacheEntry(GrainId grainId, SiloAddress siloAddress) => this.DirectoryCache.AddOrUpdate(new GrainAddress { GrainId = grainId, SiloAddress = siloAddress }, 0);
public bool TryCachedLookup(GrainId grainId, [NotNullWhen(true)] out GrainAddress? address) => (address = GetLocalCacheData(grainId)) is not null;
void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
Expand Down Expand Up @@ -1070,13 +1066,13 @@ private readonly struct SiloHashLogValue(SiloAddress? silo)
[LoggerMessage(
Level = LogLevel.Information,
EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification,
Message = "LocalGrainDirectory is deactivating {Count} activations due to a failure of silo {Silo}, since it is a primary directory partition to these grain ids."
Message = "LocalGrainDirectory is deactivating {Count} activations because silo {Silo} entered status {Status} and was the primary directory partition for these grain ids."
)]
private partial void LogInfoSiloStatusChangeNotification(int count, SiloAddressLogValue silo);
private partial void LogInfoSiloStatusChangeNotification(int count, SiloAddressLogValue silo, SiloStatus status);

[LoggerMessage(
Level = LogLevel.Error,
EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception,
EventId = (int)ErrorCode.Catalog_DeactivateActivation_Exception,
Message = "LocalGrainDirectory has thrown an exception while deactivating activation {GrainId} due to removal of silo {Silo}."
)]
private partial void LogErrorDeactivatingActivationForRemovedSilo(Exception exception, GrainId grainId, SiloAddressLogValue silo);
Expand Down Expand Up @@ -1215,27 +1211,21 @@ private readonly struct SiloHashLogValue(SiloAddress? silo)
)]
private partial void LogWarningDeleteGrainAsyncNotOwner(GrainId grainId, SiloAddress? forwardAddress, int hopCount);

private class DirectoryMembership
private class DirectoryMembership(ImmutableList<SiloAddress> membershipRingList, ImmutableHashSet<SiloAddress> membershipCache)
{
public DirectoryMembership(ImmutableList<SiloAddress> membershipRingList, ImmutableHashSet<SiloAddress> membershipCache)
{
this.MembershipRingList = membershipRingList;
this.MembershipCache = membershipCache;
}

public static DirectoryMembership Default { get; } = new DirectoryMembership(ImmutableList<SiloAddress>.Empty, ImmutableHashSet<SiloAddress>.Empty);
public static DirectoryMembership Default { get; } = new DirectoryMembership([], []);

public static DirectoryMembership Create(IEnumerable<SiloAddress> members)
{
var builder = ImmutableList.CreateBuilder<SiloAddress>();
builder.AddRange(members);
builder.Sort(LocalGrainDirectory.CompareSiloAddress);
builder.Sort(CompareSiloAddress);
var ring = builder.ToImmutable();
return new DirectoryMembership(ring, ring.ToImmutableHashSet());
return new DirectoryMembership(ring, [.. ring]);
}

public ImmutableList<SiloAddress> MembershipRingList { get; }
public ImmutableHashSet<SiloAddress> MembershipCache { get; }
public ImmutableList<SiloAddress> MembershipRingList { get; } = membershipRingList;
public ImmutableHashSet<SiloAddress> MembershipCache { get; } = membershipCache;
}
}
}
Loading