diff --git a/CHANGELOG.md b/CHANGELOG.md index ed04a2de5..f0a077c1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## v1.0.0 +- Added `SuspendInstanceAsync` and `ResumeInstanceAsync` to `DurableTaskClient`. - `TaskOrchestrationContext.CreateReplaySafeLogger` now creates `ILogger` directly (as opposed to wrapping an existing `ILogger`). - Durable Functions class-based syntax now resolves `ITaskActivity` instances from `IServiceProvider`, if available there. - `DurableTaskClient` methods have been touched up to ensure `CancellationToken` is included, as well as is the last parameter. diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index 5bf51c82d..0902485fa 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -178,6 +178,33 @@ public virtual Task WaitForInstanceStartAsync( string instanceId, CancellationToken cancellation) => this.WaitForInstanceStartAsync(instanceId, false, cancellation); + /// + /// Suspends an orchestration instance, halting processing of it until is used + /// to resume the orchestration. + /// + /// The instance ID of the orchestration to suspend. + /// The optional suspension reason. + /// + /// A that can be used to cancel the suspend operation. Note, cancelling this token + /// does not resume the orchestration if suspend was successful. + /// + /// A task that completes when the suspend has been committed to the backend. + public abstract Task SuspendInstanceAsync( + string instanceId, string? reason = null, CancellationToken cancellation = default); + + /// + /// Resumes an orchestration instance that was suspended via . + /// + /// The instance ID of the orchestration to resume. + /// The optional resume reason. + /// + /// A that can be used to cancel the resume operation. Note, cancelling this token + /// does not re-suspend the orchestration if resume was successful. + /// + /// A task that completes when the resume has been committed to the backend. + public abstract Task ResumeInstanceAsync( + string instanceId, string? reason = null, CancellationToken cancellation = default); + /// /// Waits for an orchestration to start running and returns a /// object that contains metadata about the started instance. diff --git a/src/Client/Core/OrchestrationRuntimeStatus.cs b/src/Client/Core/OrchestrationRuntimeStatus.cs index 55ac70c26..7d25d04ba 100644 --- a/src/Client/Core/OrchestrationRuntimeStatus.cs +++ b/src/Client/Core/OrchestrationRuntimeStatus.cs @@ -44,4 +44,9 @@ public enum OrchestrationRuntimeStatus /// The orchestration was scheduled but hasn't started running. /// Pending, + + /// + /// The orchestration is in a suspended state. + /// + Suspended, } diff --git a/src/Client/Grpc/Client.Grpc.csproj b/src/Client/Grpc/Client.Grpc.csproj index 01b9434ae..19887603c 100644 --- a/src/Client/Grpc/Client.Grpc.csproj +++ b/src/Client/Grpc/Client.Grpc.csproj @@ -13,8 +13,7 @@ - - + diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index 4787e15ee..c725790af 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -136,6 +136,50 @@ await this.sidecarClient.TerminateInstanceAsync( cancellationToken: cancellation); } + /// + public override async Task SuspendInstanceAsync( + string instanceId, string? reason = null, CancellationToken cancellation = default) + { + P.SuspendRequest request = new() + { + InstanceId = instanceId, + Reason = reason, + }; + + try + { + await this.sidecarClient.SuspendInstanceAsync( + request, cancellationToken: cancellation); + } + catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) + { + throw new OperationCanceledException( + $"The {nameof(this.SuspendInstanceAsync)} operation was canceled.", e, cancellation); + } + } + + /// + public override async Task ResumeInstanceAsync( + string instanceId, string? reason = null, CancellationToken cancellation = default) + { + P.ResumeRequest request = new() + { + InstanceId = instanceId, + Reason = reason, + }; + + try + { + await this.sidecarClient.ResumeInstanceAsync( + request, cancellationToken: cancellation); + } + catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) + { + throw new OperationCanceledException( + $"The {nameof(this.ResumeInstanceAsync)} operation was canceled.", e, cancellation); + } + } + /// public override async Task GetInstanceMetadataAsync( string instanceId, bool getInputsAndOutputs = false, CancellationToken cancellation = default) diff --git a/src/Client/Grpc/ProtoUtils.cs b/src/Client/Grpc/ProtoUtils.cs index ccac27546..eb7d81f7a 100644 --- a/src/Client/Grpc/ProtoUtils.cs +++ b/src/Client/Grpc/ProtoUtils.cs @@ -71,6 +71,7 @@ internal static P.OrchestrationStatus ToGrpcStatus(this OrchestrationRuntimeStat OrchestrationRuntimeStatus.Pending => P.OrchestrationStatus.Pending, OrchestrationRuntimeStatus.Running => P.OrchestrationStatus.Running, OrchestrationRuntimeStatus.Terminated => P.OrchestrationStatus.Terminated, + OrchestrationRuntimeStatus.Suspended => P.OrchestrationStatus.Suspended, _ => throw new ArgumentOutOfRangeException(nameof(status), "Unexpected value"), }; #pragma warning restore 0618 // Referencing Obsolete member. diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index b2997a611..f577cf142 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -60,6 +60,12 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto) case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated: historyEvent = new ExecutionTerminatedEvent(proto.EventId, proto.ExecutionTerminated.Input); break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended: + historyEvent = new ExecutionSuspendedEvent(proto.EventId, proto.ExecutionSuspended.Input); + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed: + historyEvent = new ExecutionResumedEvent(proto.EventId, proto.ExecutionResumed.Input); + break; case P.HistoryEvent.EventTypeOneofCase.TaskScheduled: historyEvent = new TaskScheduledEvent( proto.EventId, diff --git a/src/Worker/Grpc/Worker.Grpc.csproj b/src/Worker/Grpc/Worker.Grpc.csproj index b20546cc7..10f5a7a67 100644 --- a/src/Worker/Grpc/Worker.Grpc.csproj +++ b/src/Worker/Grpc/Worker.Grpc.csproj @@ -13,10 +13,9 @@ - - + diff --git a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs index d7854c95a..4cc043467 100644 --- a/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs +++ b/test/Client/Core.Tests/DependencyInjection/DefaultDurableTaskClientBuilderTests.cs @@ -106,6 +106,12 @@ public override Task RaiseEventAsync( throw new NotImplementedException(); } + public override Task ResumeInstanceAsync( + string instanceId, string? reason = null, CancellationToken cancellation = default) + { + throw new NotImplementedException(); + } + public override Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, @@ -115,6 +121,12 @@ public override Task ScheduleNewOrchestrationInstanceAsync( throw new NotImplementedException(); } + public override Task SuspendInstanceAsync( + string instanceId, string? reason = null, CancellationToken cancellation = default) + { + throw new NotImplementedException(); + } + public override Task TerminateAsync( string instanceId, object? output = null, CancellationToken cancellation = default) { diff --git a/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs b/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs index 2a4cf1b1d..9c85b716d 100644 --- a/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs +++ b/test/Client/Core.Tests/DependencyInjection/DurableTaskClientBuilderExtensionsTests.cs @@ -136,6 +136,12 @@ public override Task RaiseEventAsync( throw new NotImplementedException(); } + public override Task ResumeInstanceAsync( + string instanceId, string? reason = null, CancellationToken cancellation = default) + { + throw new NotImplementedException(); + } + public override Task ScheduleNewOrchestrationInstanceAsync( TaskName orchestratorName, object? input = null, @@ -145,6 +151,12 @@ public override Task ScheduleNewOrchestrationInstanceAsync( throw new NotImplementedException(); } + public override Task SuspendInstanceAsync( + string instanceId, string? reason = null, CancellationToken cancellation = default) + { + throw new NotImplementedException(); + } + public override Task TerminateAsync( string instanceId, object? output = null, CancellationToken cancellation = default) {