Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,8 @@ void ProcessRequestsToInvalidActivation()
DeactivationReason = new(DeactivationReasonCode.ActivationUnresponsive,
$"{DeactivationReason.Description}. Activation {this} has been deactivating since {DeactivationStartTime.Value} and is likely stuck");
}

AbandonStuckDeactivatingActivation();
}

if (!IsStuckDeactivating && !IsStuckProcessingMessage)
Expand All @@ -1161,6 +1163,20 @@ void ProcessRequestsToInvalidActivation()
}
}

void AbandonStuckDeactivatingActivation()
{
var forwardingAddress = ForwardingAddress;
LogWarningAbandoningStuckDeactivatingActivation(_shared.Logger, this, forwardingAddress);

// The migration target is not proven until deactivation reaches StartMigrationAsync.
// If deactivation is stuck before then, re-address messages instead of forwarding to a
// target which may not have a valid replacement activation.
ForwardingAddress = null;
UnregisterMessageTarget();
_shared.InternalRuntime.GrainLocator.Unregister(Address, UnregistrationCause.Force).Ignore();
GetDeactivationCompletionSource().TrySetResult(true);
}

bool MayInvokeRequest(Message incoming)
{
if (!IsCurrentlyExecuting)
Expand Down Expand Up @@ -2589,6 +2605,11 @@ private static partial void LogWarningDispatcher_ExtendedMessageProcessing(
Message blockingRequest,
Message message);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Abandoning stuck deactivating activation {Activation}. ForwardingAddress={ForwardingAddress}")]
private static partial void LogWarningAbandoningStuckDeactivatingActivation(ILogger logger, ActivationData activation, SiloAddress? forwardingAddress);

private readonly struct ActivationDataLogValue(ActivationData activation, bool includeExtraDetails = false)
{
public override string ToString() => activation.ToDetailedString(includeExtraDetails);
Expand Down
107 changes: 107 additions & 0 deletions test/Orleans.DefaultCluster.Tests/Migration/MigrationTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using Microsoft.Extensions.DependencyInjection;
using Orleans.Concurrency;
using Orleans.Configuration;
using Orleans.Core.Internal;
using Orleans.Placement;
using Orleans.Runtime;
Expand Down Expand Up @@ -302,6 +305,110 @@ public async Task FailRehydrationTest()
}
}

public class StuckDeactivationRecoveryTests : TestClusterPerTest
{
protected override void ConfigureTestCluster(TestClusterBuilder builder)
{
builder.AddSiloBuilderConfigurator<Configurator>();
}

[Fact, TestCategory("BVT")]
public async Task StuckDeactivatingActivationIsAbandoned()
{
var grain = GrainFactory.GetGrain<IStuckDeactivationTestGrain>(GetRandomGrainId());
var grainId = grain.GetGrainId();
var originalAddress = await grain.GetGrainAddress();
var targetHost = HostedCluster.GetActiveSilos().Select(s => s.SiloAddress).First(address => !address.Equals(originalAddress.SiloAddress));
var blockingCall = grain.BlockUntilReleased();

try
{
await grain.WaitUntilBlocked().WaitAsync(TimeSpan.FromSeconds(10));
await grain.StartMigrationTo(targetHost).AsTask().WaitAsync(TimeSpan.FromSeconds(10));
await Task.Delay(TimeSpan.FromSeconds(1));

var newAddress = await grain.GetGrainAddress().AsTask().WaitAsync(TimeSpan.FromSeconds(10));

Assert.NotEqual(originalAddress.ActivationId, newAddress.ActivationId);
}
finally
{
StuckDeactivationTestGrain.Release(grainId);
}

var completed = await Task.WhenAny(blockingCall, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.Same(blockingCall, completed);
await blockingCall;
}

private sealed class Configurator : ISiloConfigurator
{
public void Configure(ISiloBuilder hostBuilder)
{
hostBuilder
.Configure<SiloMessagingOptions>(options =>
{
options.MaxRequestProcessingTime = TimeSpan.FromMilliseconds(250);
options.ResponseTimeout = TimeSpan.FromSeconds(20);
})
.Configure<GrainCollectionOptions>(options => options.DeactivationTimeout = TimeSpan.FromMilliseconds(250));
}
}
}

public interface IStuckDeactivationTestGrain : IGrainWithIntegerKey
{
Task BlockUntilReleased();

[AlwaysInterleave]
Task WaitUntilBlocked();

[AlwaysInterleave]
ValueTask StartMigrationTo(SiloAddress targetHost);

ValueTask<GrainAddress> GetGrainAddress();
}

[RandomPlacement]
public class StuckDeactivationTestGrain : Grain, IStuckDeactivationTestGrain
{
private static readonly ConcurrentDictionary<GrainId, BlockState> BlockStates = new();

public Task BlockUntilReleased()
{
var state = GetBlockState();
state.Started.TrySetResult();
return state.Released.Task;
}

public Task WaitUntilBlocked() => GetBlockState().Started.Task;

public ValueTask StartMigrationTo(SiloAddress targetHost)
{
RequestContext.Set(IPlacementDirector.PlacementHintKey, targetHost);
MigrateOnIdle();
return default;
}

public ValueTask<GrainAddress> GetGrainAddress() => new(GrainContext.Address);

public static void Release(GrainId grainId)
{
if (BlockStates.TryRemove(grainId, out var state))
{
state.Released.TrySetResult();
}
}

private BlockState GetBlockState() => BlockStates.GetOrAdd(GrainContext.GrainId, static _ => new());

private sealed class BlockState
{
public readonly TaskCompletionSource Started = new(TaskCreationOptions.RunContinuationsAsynchronously);
public readonly TaskCompletionSource Released = new(TaskCreationOptions.RunContinuationsAsynchronously);
}
}

public interface IMigrationTestGrain : IGrainWithIntegerKey
{
ValueTask<GrainAddress> GetGrainAddress();
Expand Down
Loading