From f17dfa43f360923edea13bd3be3ea05b1ca521b4 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Fri, 6 Mar 2026 05:23:35 -0600 Subject: [PATCH 1/2] Added suspend and resume checks in addition to termination on worker Signed-off-by: Whit Waldo --- src/Dapr.Workflow/Worker/WorkflowWorker.cs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Dapr.Workflow/Worker/WorkflowWorker.cs b/src/Dapr.Workflow/Worker/WorkflowWorker.cs index a41d84743..143577cdd 100644 --- a/src/Dapr.Workflow/Worker/WorkflowWorker.cs +++ b/src/Dapr.Workflow/Worker/WorkflowWorker.cs @@ -113,11 +113,14 @@ private async Task HandleOrchestratorResponseAsync(Orchest } } - //If the most recent event is `ExecutionTerminated`, acknowledge termination immediately + //If the most recent event is `ExecutionTerminated`, `ExecutionSuspended` or `ExecutionResumed`, acknowledge termination immediately var timelineEvents = allPastEvents.Concat(request.NewEvents).ToList(); var latestEvent = timelineEvents.Count > 0 ? timelineEvents[^1] : null; - - if (latestEvent?.ExecutionTerminated != null) + + OrchestrationStatus? status = latestEvent?.ExecutionTerminated != null ? OrchestrationStatus.Terminated : + latestEvent?.ExecutionSuspended != null ? OrchestrationStatus.Suspended : + latestEvent?.ExecutionResumed != null ? OrchestrationStatus.Running : null; + if (status is not null) { return new OrchestratorResponse { @@ -129,7 +132,7 @@ private async Task HandleOrchestratorResponseAsync(Orchest { CompleteOrchestration = new CompleteOrchestrationAction { - OrchestrationStatus = OrchestrationStatus.Terminated + OrchestrationStatus = (OrchestrationStatus)status } } } From 7e24224b44fee2c0c20fa2bac2cf5733f2456a09 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Fri, 6 Mar 2026 05:45:23 -0600 Subject: [PATCH 2/2] Fixed pause/resume and added integration test Signed-off-by: Whit Waldo --- src/Dapr.Workflow/Worker/WorkflowWorker.cs | 20 +++-- .../PauseResumeTests.cs | 42 ++++++++++ .../Worker/WorkflowWorkerTests.cs | 82 +++++++++++++++++++ 3 files changed, 138 insertions(+), 6 deletions(-) diff --git a/src/Dapr.Workflow/Worker/WorkflowWorker.cs b/src/Dapr.Workflow/Worker/WorkflowWorker.cs index 143577cdd..9b12b2371 100644 --- a/src/Dapr.Workflow/Worker/WorkflowWorker.cs +++ b/src/Dapr.Workflow/Worker/WorkflowWorker.cs @@ -113,14 +113,11 @@ private async Task HandleOrchestratorResponseAsync(Orchest } } - //If the most recent event is `ExecutionTerminated`, `ExecutionSuspended` or `ExecutionResumed`, acknowledge termination immediately + // If the most recent event is `ExecutionTerminated`, acknowledge termination immediately. var timelineEvents = allPastEvents.Concat(request.NewEvents).ToList(); var latestEvent = timelineEvents.Count > 0 ? timelineEvents[^1] : null; - OrchestrationStatus? status = latestEvent?.ExecutionTerminated != null ? OrchestrationStatus.Terminated : - latestEvent?.ExecutionSuspended != null ? OrchestrationStatus.Suspended : - latestEvent?.ExecutionResumed != null ? OrchestrationStatus.Running : null; - if (status is not null) + if (latestEvent?.ExecutionTerminated != null) { return new OrchestratorResponse { @@ -132,12 +129,23 @@ private async Task HandleOrchestratorResponseAsync(Orchest { CompleteOrchestration = new CompleteOrchestrationAction { - OrchestrationStatus = (OrchestrationStatus)status + OrchestrationStatus = OrchestrationStatus.Terminated } } } }; } + + // If the instance is suspended, acknowledge the work item without running the orchestrator. + // This keeps the workflow paused while still committing the suspension event. + if (latestEvent?.ExecutionSuspended != null) + { + return new OrchestratorResponse + { + InstanceId = request.InstanceId, + CompletionToken = completionToken + }; + } // Create a new version tracker for this turn var versionTracker = new WorkflowVersionTracker(allPastEvents); diff --git a/test/Dapr.IntegrationTest.Workflow/PauseResumeTests.cs b/test/Dapr.IntegrationTest.Workflow/PauseResumeTests.cs index 1f2989986..629c93516 100644 --- a/test/Dapr.IntegrationTest.Workflow/PauseResumeTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/PauseResumeTests.cs @@ -21,6 +21,48 @@ namespace Dapr.IntegrationTest.Workflow; public sealed class PauseResumeTests { + [Fact] + public async Task ShouldReportPausedStatusWhenWorkflowIsSuspended() + { + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(needsActorState: true); + await environment.StartAsync(); + + var harness = new DaprHarnessBuilder(componentsDir) + .WithEnvironment(environment) + .BuildWorkflow(); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => opt.RegisterWorkflow(), + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(WaitingWorkflow), workflowInstanceId); + await Task.Delay(TimeSpan.FromSeconds(2)); + + // Pause the workflow + await daprWorkflowClient.SuspendWorkflowAsync(workflowInstanceId); + await Task.Delay(TimeSpan.FromSeconds(2)); + + var pausedState = await daprWorkflowClient.GetWorkflowStateAsync(workflowInstanceId); + Assert.NotNull(pausedState); + Assert.Equal(WorkflowRuntimeStatus.Suspended, pausedState.RuntimeStatus); + } + [Fact] public async Task ShouldPauseAndResumeWorkflow() { diff --git a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs index 0aafafb88..7d2481bf9 100644 --- a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs +++ b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs @@ -253,6 +253,88 @@ public async Task HandleOrchestratorResponseAsync_ShouldNotReturnTerminatedCompl Assert.NotEqual(OrchestrationStatus.Terminated, action.CompleteOrchestration!.OrchestrationStatus); Assert.Equal(OrchestrationStatus.Failed, action.CompleteOrchestration.OrchestrationStatus); } + + [Fact] + public async Task HandleOrchestratorResponseAsync_ShouldReturnEmptyResponse_WhenLatestEventIsExecutionSuspended() + { + var sp = new ServiceCollection().BuildServiceProvider(); + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var options = new WorkflowRuntimeOptions(); + + var worker = new WorkflowWorker( + CreateGrpcClientMock().Object, + new StubWorkflowsFactory(), + NullLoggerFactory.Instance, + serializer, + sp, + options); + + var request = new OrchestratorRequest + { + InstanceId = "i", + PastEvents = + { + new HistoryEvent + { + ExecutionStarted = new ExecutionStartedEvent { Name = "wf-not-registered", Input = "" } + } + }, + NewEvents = + { + new HistoryEvent + { + ExecutionSuspended = new ExecutionSuspendedEvent() + } + } + }; + + var response = await InvokeHandleOrchestratorResponseAsync(worker, request); + + Assert.Equal("i", response.InstanceId); + Assert.Empty(response.Actions); + } + + [Fact] + public async Task HandleOrchestratorResponseAsync_ShouldNotShortCircuit_WhenLatestEventIsExecutionResumed() + { + var sp = new ServiceCollection().BuildServiceProvider(); + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var options = new WorkflowRuntimeOptions(); + + var worker = new WorkflowWorker( + CreateGrpcClientMock().Object, + new StubWorkflowsFactory(), + NullLoggerFactory.Instance, + serializer, + sp, + options); + + var request = new OrchestratorRequest + { + InstanceId = "i", + PastEvents = + { + new HistoryEvent + { + ExecutionStarted = new ExecutionStartedEvent { Name = "wf-not-registered", Input = "" } + } + }, + NewEvents = + { + new HistoryEvent + { + ExecutionResumed = new ExecutionResumedEvent() + } + } + }; + + var response = await InvokeHandleOrchestratorResponseAsync(worker, request); + + Assert.Equal("i", response.InstanceId); + var action = Assert.Single(response.Actions); + Assert.NotNull(action.CompleteOrchestration); + Assert.Equal(OrchestrationStatus.Failed, action.CompleteOrchestration!.OrchestrationStatus); + } [Fact] public async Task ExecuteAsync_ShouldSwallowOperationCanceledException_WhenStoppingTokenIsCanceled()