Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 0 additions & 93 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ namespace Orleans.Runtime
{
internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecycleParticipant<ISiloLifecycle>
{
private readonly SiloAddress _siloAddress;
private readonly ActivationCollector activationCollector;
private readonly GrainDirectoryResolver grainDirectoryResolver;
private readonly ActivationDirectory activations;
private readonly IServiceProvider serviceProvider;
private readonly ILogger logger;
Expand All @@ -30,8 +28,6 @@ internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecyclePartic
#endif

public Catalog(
ILocalSiloDetails localSiloDetails,
GrainDirectoryResolver grainDirectoryResolver,
ActivationDirectory activationDirectory,
ActivationCollector activationCollector,
IServiceProvider serviceProvider,
Expand All @@ -40,8 +36,6 @@ public Catalog(
SystemTargetShared shared)
Comment thread
ReubenBond marked this conversation as resolved.
: base(Constants.CatalogType, shared)
{
this._siloAddress = localSiloDetails.SiloAddress;
this.grainDirectoryResolver = grainDirectoryResolver;
this.activations = activationDirectory;
this.serviceProvider = serviceProvider;
this.grainActivator = grainActivator;
Expand Down Expand Up @@ -328,99 +322,12 @@ await Parallel.ForEachAsync(addresses, (activationAddress, cancellationToken) =>
});
}

// TODO move this logic in the LocalGrainDirectory
internal void OnSiloStatusChange(ILocalGrainDirectory directory, SiloAddress updatedSilo, SiloStatus status)
{
// ignore joining events and also events on myself.
if (updatedSilo.Equals(_siloAddress)) return;

// We deactivate those activations when silo goes either of ShuttingDown/Stopping/Dead states,
// since this is what Directory is doing as well. Directory removes a silo based on all those 3 statuses,
// thus it will only deliver a "remove" notification for a given silo once to us. Therefore, we need to react the fist time we are notified.
// We may review the directory behavior in the future and treat ShuttingDown differently ("drain only") and then this code will have to change a well.
if (!status.IsTerminating()) return;
if (status == SiloStatus.Dead)
{
this.RuntimeClient.BreakOutstandingMessagesToSilo(updatedSilo);
}

var activationsToShutdown = new List<IGrainContext>();
try
{
// scan all activations in activation directory and deactivate the ones that the removed silo is their primary partition owner.
// Note: No lock needed here since ActivationDirectory uses ConcurrentDictionary which provides thread-safe enumeration
foreach (var activation in activations)
{
try
{
var activationData = activation.Value;
var placementStrategy = activationData.GetComponent<PlacementStrategy>();
var isUsingGrainDirectory = placementStrategy is { IsUsingGrainDirectory: true };
if (!isUsingGrainDirectory || !grainDirectoryResolver.IsUsingDefaultDirectory(activationData.GrainId.Type)) continue;
if (!updatedSilo.Equals(directory.GetPrimaryForGrain(activationData.GrainId))) continue;

activationsToShutdown.Add(activationData);
}
catch (Exception exc)
{
LogErrorCatalogSiloStatusChangeNotification(new(updatedSilo), exc);
}
}

if (activationsToShutdown.Count > 0)
{
LogInfoCatalogSiloStatusChangeNotification(activationsToShutdown.Count, new(updatedSilo));
}
}
finally
{
// outside the lock.
if (activationsToShutdown.Count > 0)
{
var reasonText = $"This activation is being deactivated due to a failure of server {updatedSilo}, since it was responsible for this activation's grain directory registration.";
var reason = new DeactivationReason(DeactivationReasonCode.DirectoryFailure, reasonText);
StartDeactivatingActivations(reason, activationsToShutdown, CancellationToken.None);
}
}

void StartDeactivatingActivations(DeactivationReason reason, List<IGrainContext> list, CancellationToken cancellationToken)
{
if (list == null || list.Count == 0) return;

LogDebugDeactivateActivations(list.Count);

foreach (var activation in list)
{
activation.Deactivate(reason, cancellationToken);
}
}
}

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
// Do nothing, just ensure that this instance is created so that it can register itself in the activation directory.
_siloStatusOracle = serviceProvider.GetRequiredService<ISiloStatusOracle>();
}

private readonly struct SiloAddressLogValue(SiloAddress silo)
{
public override string ToString() => silo.ToStringWithHashCode();
}

[LoggerMessage(
Level = LogLevel.Error,
EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification_Exception,
Message = "Catalog has thrown an exception while handling removal of silo {Silo}"
)]
private partial void LogErrorCatalogSiloStatusChangeNotification(SiloAddressLogValue silo, Exception exception);

[LoggerMessage(
Level = LogLevel.Information,
EventId = (int)ErrorCode.Catalog_SiloStatusChangeNotification,
Message = "Catalog is deactivating {Count} activations due to a failure of silo {Silo}, since it is a primary directory partition to these grain ids."
)]
private partial void LogInfoCatalogSiloStatusChangeNotification(int count, SiloAddressLogValue silo);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Unregistered activation {Activation}")]
Expand Down
49 changes: 23 additions & 26 deletions src/Orleans.Runtime/GrainDirectory/GrainDirectoryHandoffManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace Orleans.Runtime.GrainDirectory
internal sealed partial class GrainDirectoryHandoffManager
{
private static readonly TimeSpan RetryDelay = TimeSpan.FromMilliseconds(250);
private const int MAX_OPERATION_DEQUEUE = 2;
private readonly LocalGrainDirectory localDirectory;
private readonly ISiloStatusOracle siloStatusOracle;
private readonly IInternalGrainFactory grainFactory;
Expand Down Expand Up @@ -71,6 +70,11 @@ internal void ProcessSiloAddEvent(SiloAddress addedSilo)
private async Task ProcessAddedSiloAsync(SiloAddress addedSilo, List<GrainAddress> splitPartListSingle)
{
if (!this.localDirectory.Running) return;
if (!addedSilo.Equals(localDirectory.FindSuccessor(localDirectory.MyAddress)))
{
LogDebugNotImmediateSuccessor(logger, addedSilo);
return;
}

if (this.siloStatusOracle.GetApproximateSiloStatus(addedSilo) == SiloStatus.Active)
{
Expand Down Expand Up @@ -124,6 +128,16 @@ private async Task AcceptExistingRegistrationsAsync(List<GrainAddress> singleAct
{
if (!this.localDirectory.Running) return;

for (var i = singleActivations.Count - 1; i >= 0; i--)
{
if (singleActivations[i].SiloAddress is not { } siloAddress || this.siloStatusOracle.GetApproximateSiloStatus(siloAddress).IsTerminating())
{
singleActivations.RemoveAt(i);
}
}

if (singleActivations.Count == 0) return;

LogDebugAcceptingRegistrations(logger, singleActivations.Count);

var tasks = singleActivations.Select(addr => this.localDirectory.RegisterAsync(addr, previousAddress: null, 1)).ToArray();
Expand Down Expand Up @@ -201,7 +215,6 @@ private async Task ExecutePendingOperations()
{
using (await executorLock.LockAsync())
{
var dequeueCount = 0;
while (true)
{
// Get the next operation, or exit if there are none.
Expand All @@ -213,34 +226,23 @@ private async Task ExecutePendingOperations()
op = this.pendingOperations.Peek();
}

dequeueCount++;

try
{
await op.Action(this, op.State);
// Success, reset the dequeue count
dequeueCount = 0;
}
catch (Exception exception)
{
if (dequeueCount < MAX_OPERATION_DEQUEUE)
{
LogWarningOperationFailedRetry(logger, exception, op.Name);
await Task.Delay(RetryDelay);
}
else
lock (this)
{
LogWarningOperationFailedNoRetry(logger, exception, op.Name);
this.pendingOperations.Dequeue();
}
}
if (dequeueCount == 0 || dequeueCount >= MAX_OPERATION_DEQUEUE)
catch (Exception exception)
{
lock (this)
if (!this.localDirectory.Running)
{
// Remove the operation from the queue if it was a success
// or if we tried too many times
this.pendingOperations.Dequeue();
return;
}

LogWarningOperationFailedRetry(logger, exception, op.Name);
await Task.Delay(RetryDelay);
}
}
}
Expand Down Expand Up @@ -335,10 +337,5 @@ private readonly struct GrainAddressesLogValue(List<GrainAddress> grainAddresses
)]
private static partial void LogWarningOperationFailedRetry(ILogger logger, Exception exception, string operation);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "{Operation} failed, will NOT be retried"
)]
private static partial void LogWarningOperationFailedNoRetry(ILogger logger, Exception exception, string operation);
}
}
Loading
Loading