diff --git a/src/Orleans.Runtime/Catalog/ActivationData.cs b/src/Orleans.Runtime/Catalog/ActivationData.cs index e0899b1b3c8..02e7fa5a40b 100644 --- a/src/Orleans.Runtime/Catalog/ActivationData.cs +++ b/src/Orleans.Runtime/Catalog/ActivationData.cs @@ -1138,6 +1138,8 @@ void ProcessRequestsToInvalidActivation() DeactivationReason = new(DeactivationReasonCode.ActivationUnresponsive, $"{DeactivationReason.Description}. Activation {this} has been deactivating since {DeactivationStartTime.Value} and is likely stuck"); } + + AbandonStuckDeactivatingActivation(); } if (!IsStuckDeactivating && !IsStuckProcessingMessage) @@ -1161,6 +1163,20 @@ void ProcessRequestsToInvalidActivation() } } + void AbandonStuckDeactivatingActivation() + { + var forwardingAddress = ForwardingAddress; + LogWarningAbandoningStuckDeactivatingActivation(_shared.Logger, this, forwardingAddress); + + // The migration target is not proven until deactivation reaches StartMigrationAsync. + // If deactivation is stuck before then, re-address messages instead of forwarding to a + // target which may not have a valid replacement activation. + ForwardingAddress = null; + UnregisterMessageTarget(); + _shared.InternalRuntime.GrainLocator.Unregister(Address, UnregistrationCause.Force).Ignore(); + GetDeactivationCompletionSource().TrySetResult(true); + } + bool MayInvokeRequest(Message incoming) { if (!IsCurrentlyExecuting) @@ -2589,6 +2605,11 @@ private static partial void LogWarningDispatcher_ExtendedMessageProcessing( Message blockingRequest, Message message); + [LoggerMessage( + Level = LogLevel.Warning, + Message = "Abandoning stuck deactivating activation {Activation}. ForwardingAddress={ForwardingAddress}")] + private static partial void LogWarningAbandoningStuckDeactivatingActivation(ILogger logger, ActivationData activation, SiloAddress? forwardingAddress); + private readonly struct ActivationDataLogValue(ActivationData activation, bool includeExtraDetails = false) { public override string ToString() => activation.ToDetailedString(includeExtraDetails); diff --git a/test/Orleans.DefaultCluster.Tests/Migration/MigrationTests.cs b/test/Orleans.DefaultCluster.Tests/Migration/MigrationTests.cs index 146fb3d4395..2149d83dbba 100644 --- a/test/Orleans.DefaultCluster.Tests/Migration/MigrationTests.cs +++ b/test/Orleans.DefaultCluster.Tests/Migration/MigrationTests.cs @@ -1,5 +1,8 @@ +using System.Collections.Concurrent; using System.Diagnostics; using Microsoft.Extensions.DependencyInjection; +using Orleans.Concurrency; +using Orleans.Configuration; using Orleans.Core.Internal; using Orleans.Placement; using Orleans.Runtime; @@ -302,6 +305,110 @@ public async Task FailRehydrationTest() } } + public class StuckDeactivationRecoveryTests : TestClusterPerTest + { + protected override void ConfigureTestCluster(TestClusterBuilder builder) + { + builder.AddSiloBuilderConfigurator(); + } + + [Fact, TestCategory("BVT")] + public async Task StuckDeactivatingActivationIsAbandoned() + { + var grain = GrainFactory.GetGrain(GetRandomGrainId()); + var grainId = grain.GetGrainId(); + var originalAddress = await grain.GetGrainAddress(); + var targetHost = HostedCluster.GetActiveSilos().Select(s => s.SiloAddress).First(address => !address.Equals(originalAddress.SiloAddress)); + var blockingCall = grain.BlockUntilReleased(); + + try + { + await grain.WaitUntilBlocked().WaitAsync(TimeSpan.FromSeconds(10)); + await grain.StartMigrationTo(targetHost).AsTask().WaitAsync(TimeSpan.FromSeconds(10)); + await Task.Delay(TimeSpan.FromSeconds(1)); + + var newAddress = await grain.GetGrainAddress().AsTask().WaitAsync(TimeSpan.FromSeconds(10)); + + Assert.NotEqual(originalAddress.ActivationId, newAddress.ActivationId); + } + finally + { + StuckDeactivationTestGrain.Release(grainId); + } + + var completed = await Task.WhenAny(blockingCall, Task.Delay(TimeSpan.FromSeconds(10))); + Assert.Same(blockingCall, completed); + await blockingCall; + } + + private sealed class Configurator : ISiloConfigurator + { + public void Configure(ISiloBuilder hostBuilder) + { + hostBuilder + .Configure(options => + { + options.MaxRequestProcessingTime = TimeSpan.FromMilliseconds(250); + options.ResponseTimeout = TimeSpan.FromSeconds(20); + }) + .Configure(options => options.DeactivationTimeout = TimeSpan.FromMilliseconds(250)); + } + } + } + + public interface IStuckDeactivationTestGrain : IGrainWithIntegerKey + { + Task BlockUntilReleased(); + + [AlwaysInterleave] + Task WaitUntilBlocked(); + + [AlwaysInterleave] + ValueTask StartMigrationTo(SiloAddress targetHost); + + ValueTask GetGrainAddress(); + } + + [RandomPlacement] + public class StuckDeactivationTestGrain : Grain, IStuckDeactivationTestGrain + { + private static readonly ConcurrentDictionary BlockStates = new(); + + public Task BlockUntilReleased() + { + var state = GetBlockState(); + state.Started.TrySetResult(); + return state.Released.Task; + } + + public Task WaitUntilBlocked() => GetBlockState().Started.Task; + + public ValueTask StartMigrationTo(SiloAddress targetHost) + { + RequestContext.Set(IPlacementDirector.PlacementHintKey, targetHost); + MigrateOnIdle(); + return default; + } + + public ValueTask GetGrainAddress() => new(GrainContext.Address); + + public static void Release(GrainId grainId) + { + if (BlockStates.TryRemove(grainId, out var state)) + { + state.Released.TrySetResult(); + } + } + + private BlockState GetBlockState() => BlockStates.GetOrAdd(GrainContext.GrainId, static _ => new()); + + private sealed class BlockState + { + public readonly TaskCompletionSource Started = new(TaskCreationOptions.RunContinuationsAsynchronously); + public readonly TaskCompletionSource Released = new(TaskCreationOptions.RunContinuationsAsynchronously); + } + } + public interface IMigrationTestGrain : IGrainWithIntegerKey { ValueTask GetGrainAddress();