diff --git a/src/Dapr.Workflow/Worker/WorkflowWorker.cs b/src/Dapr.Workflow/Worker/WorkflowWorker.cs index a41d84743..9b12b2371 100644 --- a/src/Dapr.Workflow/Worker/WorkflowWorker.cs +++ b/src/Dapr.Workflow/Worker/WorkflowWorker.cs @@ -113,10 +113,10 @@ private async Task HandleOrchestratorResponseAsync(Orchest } } - //If the most recent event is `ExecutionTerminated`, acknowledge termination immediately + // If the most recent event is `ExecutionTerminated`, acknowledge termination immediately. var timelineEvents = allPastEvents.Concat(request.NewEvents).ToList(); var latestEvent = timelineEvents.Count > 0 ? timelineEvents[^1] : null; - + if (latestEvent?.ExecutionTerminated != null) { return new OrchestratorResponse @@ -135,6 +135,17 @@ private async Task HandleOrchestratorResponseAsync(Orchest } }; } + + // If the instance is suspended, acknowledge the work item without running the orchestrator. + // This keeps the workflow paused while still committing the suspension event. + if (latestEvent?.ExecutionSuspended != null) + { + return new OrchestratorResponse + { + InstanceId = request.InstanceId, + CompletionToken = completionToken + }; + } // Create a new version tracker for this turn var versionTracker = new WorkflowVersionTracker(allPastEvents); diff --git a/test/Dapr.IntegrationTest.Workflow/PauseResumeTests.cs b/test/Dapr.IntegrationTest.Workflow/PauseResumeTests.cs index 1f2989986..629c93516 100644 --- a/test/Dapr.IntegrationTest.Workflow/PauseResumeTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/PauseResumeTests.cs @@ -21,6 +21,48 @@ namespace Dapr.IntegrationTest.Workflow; public sealed class PauseResumeTests { + [Fact] + public async Task ShouldReportPausedStatusWhenWorkflowIsSuspended() + { + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(needsActorState: true); + await environment.StartAsync(); + + var harness = new DaprHarnessBuilder(componentsDir) + .WithEnvironment(environment) + .BuildWorkflow(); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => opt.RegisterWorkflow(), + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(WaitingWorkflow), workflowInstanceId); + await Task.Delay(TimeSpan.FromSeconds(2)); + + // Pause the workflow + await daprWorkflowClient.SuspendWorkflowAsync(workflowInstanceId); + await Task.Delay(TimeSpan.FromSeconds(2)); + + var pausedState = await daprWorkflowClient.GetWorkflowStateAsync(workflowInstanceId); + Assert.NotNull(pausedState); + Assert.Equal(WorkflowRuntimeStatus.Suspended, pausedState.RuntimeStatus); + } + [Fact] public async Task ShouldPauseAndResumeWorkflow() { diff --git a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs index 0aafafb88..7d2481bf9 100644 --- a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs +++ b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs @@ -253,6 +253,88 @@ public async Task HandleOrchestratorResponseAsync_ShouldNotReturnTerminatedCompl Assert.NotEqual(OrchestrationStatus.Terminated, action.CompleteOrchestration!.OrchestrationStatus); Assert.Equal(OrchestrationStatus.Failed, action.CompleteOrchestration.OrchestrationStatus); } + + [Fact] + public async Task HandleOrchestratorResponseAsync_ShouldReturnEmptyResponse_WhenLatestEventIsExecutionSuspended() + { + var sp = new ServiceCollection().BuildServiceProvider(); + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var options = new WorkflowRuntimeOptions(); + + var worker = new WorkflowWorker( + CreateGrpcClientMock().Object, + new StubWorkflowsFactory(), + NullLoggerFactory.Instance, + serializer, + sp, + options); + + var request = new OrchestratorRequest + { + InstanceId = "i", + PastEvents = + { + new HistoryEvent + { + ExecutionStarted = new ExecutionStartedEvent { Name = "wf-not-registered", Input = "" } + } + }, + NewEvents = + { + new HistoryEvent + { + ExecutionSuspended = new ExecutionSuspendedEvent() + } + } + }; + + var response = await InvokeHandleOrchestratorResponseAsync(worker, request); + + Assert.Equal("i", response.InstanceId); + Assert.Empty(response.Actions); + } + + [Fact] + public async Task HandleOrchestratorResponseAsync_ShouldNotShortCircuit_WhenLatestEventIsExecutionResumed() + { + var sp = new ServiceCollection().BuildServiceProvider(); + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var options = new WorkflowRuntimeOptions(); + + var worker = new WorkflowWorker( + CreateGrpcClientMock().Object, + new StubWorkflowsFactory(), + NullLoggerFactory.Instance, + serializer, + sp, + options); + + var request = new OrchestratorRequest + { + InstanceId = "i", + PastEvents = + { + new HistoryEvent + { + ExecutionStarted = new ExecutionStartedEvent { Name = "wf-not-registered", Input = "" } + } + }, + NewEvents = + { + new HistoryEvent + { + ExecutionResumed = new ExecutionResumedEvent() + } + } + }; + + var response = await InvokeHandleOrchestratorResponseAsync(worker, request); + + Assert.Equal("i", response.InstanceId); + var action = Assert.Single(response.Actions); + Assert.NotNull(action.CompleteOrchestration); + Assert.Equal(OrchestrationStatus.Failed, action.CompleteOrchestration!.OrchestrationStatus); + } [Fact] public async Task ExecuteAsync_ShouldSwallowOperationCanceledException_WhenStoppingTokenIsCanceled()