From 1c1960cd3a79309b534e404072e76dfdfe5f60d1 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Sun, 9 Nov 2025 14:09:00 -0800 Subject: [PATCH 1/3] Initiate migration before IGrainManagementExtension.MigrateOnIdle completes --- src/Orleans.Runtime/Catalog/ActivationData.cs | 72 +++++++++++-------- test/DefaultCluster.Tests/ErrorGrainTest.cs | 19 ++--- test/Grains/TestInternalGrains/ErrorGrain.cs | 20 +++--- 3 files changed, 59 insertions(+), 52 deletions(-) diff --git a/src/Orleans.Runtime/Catalog/ActivationData.cs b/src/Orleans.Runtime/Catalog/ActivationData.cs index bcbe92279f6..d287eff954a 100644 --- a/src/Orleans.Runtime/Catalog/ActivationData.cs +++ b/src/Orleans.Runtime/Catalog/ActivationData.cs @@ -517,6 +517,11 @@ private void CancelPendingOperations() } public void Migrate(Dictionary? requestContext, CancellationToken cancellationToken = default) + { + StartMigratingOnScheduler(requestContext, tcs: null, cancellationToken); + } + + private void StartMigratingOnScheduler(Dictionary? requestContext, TaskCompletionSource? tcs, CancellationToken cancellationToken) { var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); cts.CancelAfter(_shared.InternalRuntime.CollectionOptions.Value.DeactivationTimeout); @@ -525,50 +530,59 @@ public void Migrate(Dictionary? requestContext, CancellationToke { // The grain is executing and is already deactivating, so just set the migration context and return. StartMigratingCore(requestContext, null); + tcs?.TrySetResult(); } else { // We use a named work item since it is cheaper than allocating a Task and has the benefit of being named. - _workItemGroup.QueueWorkItem(new MigrateWorkItem(this, requestContext, cts)); + var workItem = new MigrateWorkItem(this, requestContext, tcs, cts); + _workItemGroup.QueueWorkItem(workItem); } } - private async Task StartMigratingAsync(Dictionary? requestContext, CancellationTokenSource cts) + private async Task StartMigratingAsync(Dictionary? requestContext, TaskCompletionSource? deactivationInitiatedTcs, CancellationTokenSource cts) { - lock (this) - { - if (State is not (ActivationState.Activating or ActivationState.Valid or ActivationState.Deactivating)) - { - return; - } - } - try { - var newLocation = await PlaceMigratingGrainAsync(requestContext, cts.Token); - if (newLocation is null) + lock (this) { - // Will not deactivate/migrate. - return; + if (State is not (ActivationState.Activating or ActivationState.Valid or ActivationState.Deactivating)) + { + return; + } } - lock (this) + try { - if (!DeactivateCore(new DeactivationReason(DeactivationReasonCode.Migrating, "Migrating to a new location."), cts.Token)) + var newLocation = await PlaceMigratingGrainAsync(requestContext, cts.Token); + if (newLocation is null) { - // Grain is not able to start deactivating or has already completed. + // Will not deactivate/migrate. return; } - StartMigratingCore(requestContext, newLocation); - } + lock (this) + { + if (!DeactivateCore(new DeactivationReason(DeactivationReasonCode.Migrating, "Migrating to a new location."), cts.Token)) + { + // Grain is not able to start deactivating or has already completed. + return; + } + + StartMigratingCore(requestContext, newLocation); + } - LogDebugMigrating(_shared.Logger, GrainId, newLocation); + LogDebugMigrating(_shared.Logger, GrainId, newLocation); + } + catch (Exception exception) + { + LogErrorSelectingMigrationDestination(_shared.Logger, exception, GrainId); + return; + } } - catch (Exception exception) + finally { - LogErrorSelectingMigrationDestination(_shared.Logger, exception, GrainId); - return; + deactivationInitiatedTcs?.TrySetResult(); } } @@ -615,7 +629,7 @@ private void StartMigratingCore(Dictionary? requestContext, Silo public void Deactivate(DeactivationReason reason, CancellationToken cancellationToken = default) => DeactivateCore(reason, cancellationToken); - public bool DeactivateCore(DeactivationReason reason, CancellationToken cancellationToken) + private bool DeactivateCore(DeactivationReason reason, CancellationToken cancellationToken) { lock (this) { @@ -1874,8 +1888,9 @@ ValueTask IGrainManagementExtension.DeactivateOnIdle() ValueTask IGrainManagementExtension.MigrateOnIdle() { - Migrate(RequestContext.CallContextData?.Value.Values, CancellationToken.None); - return default; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + StartMigratingOnScheduler(RequestContext.CallContextData?.Value.Values, tcs, CancellationToken.None); + return new(tcs.Task); } private void UnregisterMessageTarget() @@ -2202,13 +2217,14 @@ private class DehydrationContextHolder(SerializerSessionPool sessionPool, Dictio public readonly Dictionary? RequestContext = requestContext; } - private class MigrateWorkItem(ActivationData activation, Dictionary? requestContext, CancellationTokenSource cts) : WorkItemBase + private class MigrateWorkItem(ActivationData activation, Dictionary? requestContext, TaskCompletionSource? tcs, CancellationTokenSource cts) : WorkItemBase { + private readonly TaskCompletionSource? _tcs = tcs; public override string Name => "Migrate"; public override IGrainContext GrainContext => activation; - public override void Execute() => activation.StartMigratingAsync(requestContext, cts).Ignore(); + public override void Execute() => activation.StartMigratingAsync(requestContext, _tcs, cts).Ignore(); } [LoggerMessage( diff --git a/test/DefaultCluster.Tests/ErrorGrainTest.cs b/test/DefaultCluster.Tests/ErrorGrainTest.cs index e677536511a..08904344c80 100644 --- a/test/DefaultCluster.Tests/ErrorGrainTest.cs +++ b/test/DefaultCluster.Tests/ErrorGrainTest.cs @@ -151,21 +151,10 @@ public async Task ErrorHandlingTimedMethodWithError() var grainFullName = typeof(ErrorGrain).FullName; IErrorGrain grain = this.GrainFactory.GetGrain(GetRandomGrainId(), grainFullName); - Task promise = grain.LongMethodWithError(2000); - - // there is a race in the test here. If run in debugger, the invocation can actually finish OK - Stopwatch stopwatch = Stopwatch.StartNew(); - - await Task.Delay(1000); - Assert.False(promise.IsCompleted, "The task shouldn't have completed yet."); - - stopwatch.Stop(); - Assert.True(stopwatch.ElapsedMilliseconds >= 900, $"Waited less than 900ms: ({stopwatch.ElapsedMilliseconds}ms)"); // check that we waited at least 0.9 second - Assert.True(stopwatch.ElapsedMilliseconds <= 1300, $"Waited longer than 1300ms: ({stopwatch.ElapsedMilliseconds}ms)"); - - await Assert.ThrowsAsync(() => promise); - - Assert.True(promise.Status == TaskStatus.Faulted); + Task task = grain.LongMethodWithError(2000); + Assert.False(task.IsCompleted, "The task shouldn't have completed yet."); + await Assert.ThrowsAsync(async () => await task); + Assert.True(task.Status == TaskStatus.Faulted); } /// diff --git a/test/Grains/TestInternalGrains/ErrorGrain.cs b/test/Grains/TestInternalGrains/ErrorGrain.cs index 702f6cd5a73..28936e41448 100644 --- a/test/Grains/TestInternalGrains/ErrorGrain.cs +++ b/test/Grains/TestInternalGrains/ErrorGrain.cs @@ -49,15 +49,14 @@ public Task GetAxBError(int a, int b) throw new Exception("GetAxBError(a,b)-Exception"); } - public Task LongMethod(int waitTime) + public async Task LongMethod(int waitTime) { - Thread.Sleep(waitTime); - return Task.CompletedTask; + await Task.Delay(waitTime); } - public Task LongMethodWithError(int waitTime) + public async Task LongMethodWithError(int waitTime) { - Thread.Sleep(waitTime); + await Task.Delay(waitTime); throw new Exception("LongMethodWithError"); } @@ -100,14 +99,17 @@ public Task UnobservedErrorDelayed() logger.LogInformation("UnobservedErrorDelayed()"); bool doThrow = true; // the grain method rturns OK, but leaves some unobserved promise - Task promise = Task.Factory.StartNew(() => + Task promise = Task.Factory.StartNew(async () => { if (!doThrow) - return 0; - Thread.Sleep(100); + { + return 0L; + } + + await Task.Delay(100); logger.LogInformation("About to throw 1.5."); throw new ArgumentException("ErrorGrain left Delayed Unobserved Error 1.5."); - }); + }).Unwrap(); promise = null; GC.Collect(); GC.WaitForPendingFinalizers(); From 4decaadc6d996fb3c9abe7f86dffd60f3f9482d6 Mon Sep 17 00:00:00 2001 From: Reuben Bond <203839+ReubenBond@users.noreply.github.com> Date: Mon, 10 Nov 2025 09:17:36 -0800 Subject: [PATCH 2/3] Update test/DefaultCluster.Tests/ErrorGrainTest.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- test/DefaultCluster.Tests/ErrorGrainTest.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/DefaultCluster.Tests/ErrorGrainTest.cs b/test/DefaultCluster.Tests/ErrorGrainTest.cs index 08904344c80..e4bc37e41cd 100644 --- a/test/DefaultCluster.Tests/ErrorGrainTest.cs +++ b/test/DefaultCluster.Tests/ErrorGrainTest.cs @@ -152,7 +152,7 @@ public async Task ErrorHandlingTimedMethodWithError() IErrorGrain grain = this.GrainFactory.GetGrain(GetRandomGrainId(), grainFullName); Task task = grain.LongMethodWithError(2000); - Assert.False(task.IsCompleted, "The task shouldn't have completed yet."); + // Removed flaky assertion about task completion due to potential race condition. await Assert.ThrowsAsync(async () => await task); Assert.True(task.Status == TaskStatus.Faulted); } From 740c802acc20d0cd5ba031961d472caadcfa55e1 Mon Sep 17 00:00:00 2001 From: Reuben Bond Date: Mon, 10 Nov 2025 12:08:15 -0800 Subject: [PATCH 3/3] Initiate migration before IGrainManagementExtension.MigrateOnIdle completes --- src/Orleans.Runtime/Catalog/ActivationData.cs | 240 +++++++----------- .../Migration/MigrationTests.cs | 8 +- 2 files changed, 95 insertions(+), 153 deletions(-) diff --git a/src/Orleans.Runtime/Catalog/ActivationData.cs b/src/Orleans.Runtime/Catalog/ActivationData.cs index d287eff954a..9bcdd65eca4 100644 --- a/src/Orleans.Runtime/Catalog/ActivationData.cs +++ b/src/Orleans.Runtime/Catalog/ActivationData.cs @@ -518,125 +518,33 @@ private void CancelPendingOperations() public void Migrate(Dictionary? requestContext, CancellationToken cancellationToken = default) { - StartMigratingOnScheduler(requestContext, tcs: null, cancellationToken); - } - - private void StartMigratingOnScheduler(Dictionary? requestContext, TaskCompletionSource? tcs, CancellationToken cancellationToken) - { - var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(_shared.InternalRuntime.CollectionOptions.Value.DeactivationTimeout); - - if (Equals(RuntimeContext.Current) && State is ActivationState.Deactivating) - { - // The grain is executing and is already deactivating, so just set the migration context and return. - StartMigratingCore(requestContext, null); - tcs?.TrySetResult(); - } - else - { - // We use a named work item since it is cheaper than allocating a Task and has the benefit of being named. - var workItem = new MigrateWorkItem(this, requestContext, tcs, cts); - _workItemGroup.QueueWorkItem(workItem); - } - } - - private async Task StartMigratingAsync(Dictionary? requestContext, TaskCompletionSource? deactivationInitiatedTcs, CancellationTokenSource cts) - { - try + lock (this) { - lock (this) - { - if (State is not (ActivationState.Activating or ActivationState.Valid or ActivationState.Deactivating)) - { - return; - } - } - - try - { - var newLocation = await PlaceMigratingGrainAsync(requestContext, cts.Token); - if (newLocation is null) - { - // Will not deactivate/migrate. - return; - } - - lock (this) - { - if (!DeactivateCore(new DeactivationReason(DeactivationReasonCode.Migrating, "Migrating to a new location."), cts.Token)) - { - // Grain is not able to start deactivating or has already completed. - return; - } - - StartMigratingCore(requestContext, newLocation); - } - - LogDebugMigrating(_shared.Logger, GrainId, newLocation); - } - catch (Exception exception) + if (State is not (ActivationState.Activating or ActivationState.Valid or ActivationState.Deactivating)) { - LogErrorSelectingMigrationDestination(_shared.Logger, exception, GrainId); return; } - } - finally - { - deactivationInitiatedTcs?.TrySetResult(); - } - } - - private void StartMigratingCore(Dictionary? requestContext, SiloAddress? newLocation) - { - if (DehydrationContext is not null) - { - // Migration has already started. - return; - } - // Set a migration context to capture any state which should be transferred. - // Doing this signals to the deactivation process that a migration is occurring, so it is important that this happens before we begin deactivation. - DehydrationContext = new(_shared.SerializerSessionPool, requestContext); - ForwardingAddress = newLocation; - } - - private async ValueTask PlaceMigratingGrainAsync(Dictionary? requestContext, CancellationToken cancellationToken) - { - var placementService = _shared.Runtime.ServiceProvider.GetRequiredService(); - var newLocation = await placementService.PlaceGrainAsync(GrainId, requestContext, PlacementStrategy).WaitAsync(cancellationToken); + // If migration has not already been started, set a migration context to capture any state which should be transferred. + // Doing this signals to the deactivation process that a migration is occurring, so it is important that this happens before we begin deactivation. + DehydrationContext ??= new(_shared.SerializerSessionPool, requestContext); - // If a new (different) host is not selected, do not migrate. - if (newLocation == Address.SiloAddress || newLocation is null) - { - // No more appropriate silo was selected for this grain. The migration attempt will be aborted. - // This could be because this is the only (compatible) silo for the grain or because the placement director chose this - // silo for some other reason. - if (newLocation is null) - { - LogDebugPlacementStrategyFailedToSelectDestination(_shared.Logger, PlacementStrategy, GrainId); - } - else + if (State is not ActivationState.Deactivating) { - LogDebugPlacementStrategySelectedCurrentSilo(_shared.Logger, PlacementStrategy, GrainId); + // Start deactivating the grain to prepare for migration. + Deactivate(new DeactivationReason(DeactivationReasonCode.Migrating, "Migrating to a new location."), cancellationToken); } - - // Will not migrate. - return null; } - - return newLocation; } - public void Deactivate(DeactivationReason reason, CancellationToken cancellationToken = default) => DeactivateCore(reason, cancellationToken); - - private bool DeactivateCore(DeactivationReason reason, CancellationToken cancellationToken) + public void Deactivate(DeactivationReason reason, CancellationToken cancellationToken = default) { lock (this) { var state = State; if (state is ActivationState.Invalid) { - return false; + return; } if (DeactivationReason.ReasonCode == DeactivationReasonCode.None) @@ -660,8 +568,6 @@ private bool DeactivateCore(DeactivationReason reason, CancellationToken cancell ScheduleOperation(new Command.Deactivate(cts, state)); } } - - return true; } private void DeactivateStuckActivation() @@ -1723,7 +1629,7 @@ private async Task ActivateAsync(Dictionary? requestContextData, /// private async Task FinishDeactivating(ActivationState previousState, CancellationToken cancellationToken) { - var migrated = false; + var migrating = false; var encounteredError = false; try { @@ -1777,41 +1683,14 @@ private async Task FinishDeactivating(ActivationState previousState, Cancellatio && _shared.MigrationManager is { } migrationManager && !cancellationToken.IsCancellationRequested) { - try - { - ForwardingAddress ??= await PlaceMigratingGrainAsync(context.RequestContext, cancellationToken); - - if (ForwardingAddress is { } forwardingAddress) - { - // Populate the dehydration context. - if (context.RequestContext is { } requestContext) - { - RequestContextExtensions.Import(requestContext); - } - - OnDehydrate(context.MigrationContext); - - // Send the dehydration context to the target host. - await migrationManager.MigrateAsync(forwardingAddress, GrainId, context.MigrationContext).AsTask().WaitAsync(cancellationToken); - _shared.InternalRuntime.GrainLocator.UpdateCache(GrainId, forwardingAddress); - migrated = true; - } - } - catch (Exception exception) - { - LogFailedToMigrateActivation(_shared.Logger, exception, this); - } - finally - { - RequestContext.Clear(); - } + migrating = await StartMigrationAsync(context, migrationManager, cancellationToken); } // If the instance is being deactivated due to a directory failure, we should not unregister it. var isDirectoryFailure = DeactivationReason.ReasonCode is DeactivationReasonCode.DirectoryFailure; var isShuttingDown = DeactivationReason.ReasonCode is DeactivationReasonCode.ShuttingDown; - if (!migrated && IsUsingGrainDirectory && !cancellationToken.IsCancellationRequested && !isDirectoryFailure && !isShuttingDown) + if (!migrating && IsUsingGrainDirectory && !cancellationToken.IsCancellationRequested && !isDirectoryFailure && !isShuttingDown) { // Unregister from directory. // If the grain was migrated, the new activation will perform a check-and-set on the registration itself. @@ -1842,7 +1721,7 @@ private async Task FinishDeactivating(ActivationState previousState, Cancellatio { CatalogInstruments.ActivationShutdownViaDeactivateStuckActivation(); } - else if (migrated) + else if (migrating) { CatalogInstruments.ActivationShutdownViaMigration(); } @@ -1869,6 +1748,41 @@ private async Task FinishDeactivating(ActivationState previousState, Cancellatio // Signal deactivation GetDeactivationCompletionSource().TrySetResult(true); _workSignal.Signal(); + + async ValueTask StartMigrationAsync(DehydrationContextHolder context, IActivationMigrationManager migrationManager, CancellationToken cancellationToken) + { + try + { + if (ForwardingAddress is null) + { + var selectedAddress = await PlaceMigratingGrainAsync(context.RequestContext, cancellationToken); + if (selectedAddress is null) + { + return false; + } + + ForwardingAddress = selectedAddress; + } + + // Populate the dehydration context. + if (context.RequestContext is { } requestContext) + { + RequestContextExtensions.Import(requestContext); + } + + OnDehydrate(context.MigrationContext); + + // Send the dehydration context to the target host. + await migrationManager.MigrateAsync(ForwardingAddress, GrainId, context.MigrationContext).AsTask().WaitAsync(cancellationToken); + _shared.InternalRuntime.GrainLocator.UpdateCache(GrainId, ForwardingAddress); + return true; + } + catch (Exception exception) + { + LogFailedToMigrateActivation(_shared.Logger, exception, this); + return false; + } + } } private TaskCompletionSource GetDeactivationCompletionSource() @@ -1886,11 +1800,49 @@ ValueTask IGrainManagementExtension.DeactivateOnIdle() return default; } - ValueTask IGrainManagementExtension.MigrateOnIdle() + async ValueTask IGrainManagementExtension.MigrateOnIdle() + { + var requestContextData = RequestContext.CallContextData?.Value.Values; + var selectedAddress = await PlaceMigratingGrainAsync(requestContextData, CancellationToken.None); + if (selectedAddress is null) + { + return; + } + + // Only migrate if a different silo was selected. + ForwardingAddress = selectedAddress; + LogDebugMigrating(_shared.Logger, GrainId, ForwardingAddress); + Migrate(requestContextData, cancellationToken: CancellationToken.None); + } + + private async ValueTask PlaceMigratingGrainAsync(Dictionary? requestContextData, CancellationToken cancellationToken) { - var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - StartMigratingOnScheduler(RequestContext.CallContextData?.Value.Values, tcs, CancellationToken.None); - return new(tcs.Task); + try + { + var placementService = _shared.Runtime.ServiceProvider.GetRequiredService(); + var selectedAddress = await placementService.PlaceGrainAsync(GrainId, requestContextData, PlacementStrategy); + + if (selectedAddress is null) + { + // No appropriate silo was selected for this grain. + LogDebugPlacementStrategyFailedToSelectDestination(_shared.Logger, PlacementStrategy, GrainId); + return null; + } + else if (selectedAddress == _shared.Runtime.SiloAddress) + { + // This could be because this is the only (compatible) silo for the grain or because the placement director chose this + // silo for some other reason. + LogDebugPlacementStrategySelectedCurrentSilo(_shared.Logger, PlacementStrategy, GrainId); + return null; + } + + return selectedAddress; + } + catch (Exception exception) + { + LogErrorSelectingMigrationDestination(_shared.Logger, exception, GrainId); + return null; + } } private void UnregisterMessageTarget() @@ -2217,16 +2169,6 @@ private class DehydrationContextHolder(SerializerSessionPool sessionPool, Dictio public readonly Dictionary? RequestContext = requestContext; } - private class MigrateWorkItem(ActivationData activation, Dictionary? requestContext, TaskCompletionSource? tcs, CancellationTokenSource cts) : WorkItemBase - { - private readonly TaskCompletionSource? _tcs = tcs; - public override string Name => "Migrate"; - - public override IGrainContext GrainContext => activation; - - public override void Execute() => activation.StartMigratingAsync(requestContext, _tcs, cts).Ignore(); - } - [LoggerMessage( EventId = (int)ErrorCode.Catalog_Reject_ActivationTooManyRequests, Level = LogLevel.Warning, diff --git a/test/DefaultCluster.Tests/Migration/MigrationTests.cs b/test/DefaultCluster.Tests/Migration/MigrationTests.cs index 07960406313..6d00b952cf2 100644 --- a/test/DefaultCluster.Tests/Migration/MigrationTests.cs +++ b/test/DefaultCluster.Tests/Migration/MigrationTests.cs @@ -25,7 +25,6 @@ public async Task BasicGrainMigrationTest() { var grain = GrainFactory.GetGrain(GetRandomGrainId()); var expectedState = Random.Shared.Next(); - await grain.SetState(expectedState); var originalAddress = await grain.GetGrainAddress(); GrainAddress newAddress; do @@ -33,6 +32,7 @@ public async Task BasicGrainMigrationTest() // Trigger migration without setting a placement hint, so the grain placement provider will be // free to select any location including the existing one. await grain.Cast().MigrateOnIdle(); + await grain.SetState(expectedState); newAddress = await grain.GetGrainAddress(); } while (originalAddress == newAddress); @@ -250,12 +250,12 @@ public async Task FailDehydrationTest() RequestContext.Set(IPlacementDirector.PlacementHintKey, targetHost); await grain.Cast().MigrateOnIdle(); - var newAddress = await grain.GetGrainAddress(); - Assert.Equal(targetHost, newAddress.SiloAddress); - // The grain should have lost its state during the failed migration. var newState = await grain.GetState(); Assert.NotEqual(expectedState, newState); + + var newAddress = await grain.GetGrainAddress(); + Assert.Equal(targetHost, newAddress.SiloAddress); } ///