Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 44 additions & 28 deletions src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,11 @@ private void CancelPendingOperations()
}

public void Migrate(Dictionary<string, object>? requestContext, CancellationToken cancellationToken = default)
{
StartMigratingOnScheduler(requestContext, tcs: null, cancellationToken);
}

private void StartMigratingOnScheduler(Dictionary<string, object>? requestContext, TaskCompletionSource? tcs, CancellationToken cancellationToken)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(_shared.InternalRuntime.CollectionOptions.Value.DeactivationTimeout);
Expand All @@ -525,50 +530,59 @@ public void Migrate(Dictionary<string, object>? requestContext, CancellationToke
{
// The grain is executing and is already deactivating, so just set the migration context and return.
StartMigratingCore(requestContext, null);
tcs?.TrySetResult();
}
else
{
// We use a named work item since it is cheaper than allocating a Task and has the benefit of being named.
_workItemGroup.QueueWorkItem(new MigrateWorkItem(this, requestContext, cts));
var workItem = new MigrateWorkItem(this, requestContext, tcs, cts);
_workItemGroup.QueueWorkItem(workItem);
}
}

private async Task StartMigratingAsync(Dictionary<string, object>? requestContext, CancellationTokenSource cts)
private async Task StartMigratingAsync(Dictionary<string, object>? requestContext, TaskCompletionSource? deactivationInitiatedTcs, CancellationTokenSource cts)
{
lock (this)
{
if (State is not (ActivationState.Activating or ActivationState.Valid or ActivationState.Deactivating))
{
return;
}
}

try
{
var newLocation = await PlaceMigratingGrainAsync(requestContext, cts.Token);
if (newLocation is null)
lock (this)
{
// Will not deactivate/migrate.
return;
if (State is not (ActivationState.Activating or ActivationState.Valid or ActivationState.Deactivating))
{
return;
}
}

lock (this)
try
{
if (!DeactivateCore(new DeactivationReason(DeactivationReasonCode.Migrating, "Migrating to a new location."), cts.Token))
var newLocation = await PlaceMigratingGrainAsync(requestContext, cts.Token);
if (newLocation is null)
{
// Grain is not able to start deactivating or has already completed.
// Will not deactivate/migrate.
return;
}

StartMigratingCore(requestContext, newLocation);
}
lock (this)
{
if (!DeactivateCore(new DeactivationReason(DeactivationReasonCode.Migrating, "Migrating to a new location."), cts.Token))
{
// Grain is not able to start deactivating or has already completed.
return;
}

StartMigratingCore(requestContext, newLocation);
}

LogDebugMigrating(_shared.Logger, GrainId, newLocation);
LogDebugMigrating(_shared.Logger, GrainId, newLocation);
}
catch (Exception exception)
{
LogErrorSelectingMigrationDestination(_shared.Logger, exception, GrainId);
return;
}
}
catch (Exception exception)
finally
{
LogErrorSelectingMigrationDestination(_shared.Logger, exception, GrainId);
return;
deactivationInitiatedTcs?.TrySetResult();
}
}

Expand Down Expand Up @@ -615,7 +629,7 @@ private void StartMigratingCore(Dictionary<string, object>? requestContext, Silo

public void Deactivate(DeactivationReason reason, CancellationToken cancellationToken = default) => DeactivateCore(reason, cancellationToken);

public bool DeactivateCore(DeactivationReason reason, CancellationToken cancellationToken)
private bool DeactivateCore(DeactivationReason reason, CancellationToken cancellationToken)
{
lock (this)
{
Expand Down Expand Up @@ -1874,8 +1888,9 @@ ValueTask IGrainManagementExtension.DeactivateOnIdle()

ValueTask IGrainManagementExtension.MigrateOnIdle()
{
Migrate(RequestContext.CallContextData?.Value.Values, CancellationToken.None);
return default;
var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
StartMigratingOnScheduler(RequestContext.CallContextData?.Value.Values, tcs, CancellationToken.None);
return new(tcs.Task);
}

private void UnregisterMessageTarget()
Expand Down Expand Up @@ -2202,13 +2217,14 @@ private class DehydrationContextHolder(SerializerSessionPool sessionPool, Dictio
public readonly Dictionary<string, object>? RequestContext = requestContext;
}

private class MigrateWorkItem(ActivationData activation, Dictionary<string, object>? requestContext, CancellationTokenSource cts) : WorkItemBase
private class MigrateWorkItem(ActivationData activation, Dictionary<string, object>? requestContext, TaskCompletionSource? tcs, CancellationTokenSource cts) : WorkItemBase
{
private readonly TaskCompletionSource? _tcs = tcs;
public override string Name => "Migrate";

public override IGrainContext GrainContext => activation;

public override void Execute() => activation.StartMigratingAsync(requestContext, cts).Ignore();
public override void Execute() => activation.StartMigratingAsync(requestContext, _tcs, cts).Ignore();
}

[LoggerMessage(
Expand Down
19 changes: 4 additions & 15 deletions test/DefaultCluster.Tests/ErrorGrainTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,10 @@ public async Task ErrorHandlingTimedMethodWithError()
var grainFullName = typeof(ErrorGrain).FullName;
IErrorGrain grain = this.GrainFactory.GetGrain<IErrorGrain>(GetRandomGrainId(), grainFullName);

Task promise = grain.LongMethodWithError(2000);

// there is a race in the test here. If run in debugger, the invocation can actually finish OK
Stopwatch stopwatch = Stopwatch.StartNew();

await Task.Delay(1000);
Assert.False(promise.IsCompleted, "The task shouldn't have completed yet.");

stopwatch.Stop();
Assert.True(stopwatch.ElapsedMilliseconds >= 900, $"Waited less than 900ms: ({stopwatch.ElapsedMilliseconds}ms)"); // check that we waited at least 0.9 second
Assert.True(stopwatch.ElapsedMilliseconds <= 1300, $"Waited longer than 1300ms: ({stopwatch.ElapsedMilliseconds}ms)");

await Assert.ThrowsAsync<Exception>(() => promise);

Assert.True(promise.Status == TaskStatus.Faulted);
Task task = grain.LongMethodWithError(2000);
Assert.False(task.IsCompleted, "The task shouldn't have completed yet.");
Comment thread
ReubenBond marked this conversation as resolved.
Outdated
await Assert.ThrowsAsync<Exception>(async () => await task);
Assert.True(task.Status == TaskStatus.Faulted);
}

/// <summary>
Expand Down
20 changes: 11 additions & 9 deletions test/Grains/TestInternalGrains/ErrorGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,14 @@ public Task<int> GetAxBError(int a, int b)
throw new Exception("GetAxBError(a,b)-Exception");
}

public Task LongMethod(int waitTime)
public async Task LongMethod(int waitTime)
{
Thread.Sleep(waitTime);
return Task.CompletedTask;
await Task.Delay(waitTime);
}

public Task LongMethodWithError(int waitTime)
public async Task LongMethodWithError(int waitTime)
{
Thread.Sleep(waitTime);
await Task.Delay(waitTime);
throw new Exception("LongMethodWithError");
}

Expand Down Expand Up @@ -100,14 +99,17 @@ public Task<int> UnobservedErrorDelayed()
logger.LogInformation("UnobservedErrorDelayed()");
bool doThrow = true;
// the grain method rturns OK, but leaves some unobserved promise
Task<long> promise = Task<long>.Factory.StartNew(() =>
Task<long> promise = Task.Factory.StartNew(async () =>
{
if (!doThrow)
return 0;
Thread.Sleep(100);
{
return 0L;
}

await Task.Delay(100);
logger.LogInformation("About to throw 1.5.");
throw new ArgumentException("ErrorGrain left Delayed Unobserved Error 1.5.");
});
}).Unwrap();
promise = null;
GC.Collect();
GC.WaitForPendingFinalizers();
Expand Down
Loading