diff --git a/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs b/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs index b29a92699..f7af36fdb 100644 --- a/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs +++ b/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs @@ -97,10 +97,12 @@ private async Task ReceiveLoopAsync( // Dispatch based on work item type var workItemTask = workItem.RequestCase switch { - WorkItem.RequestOneofCase.OrchestratorRequest => ProcessWorkflowAsync(workItem.OrchestratorRequest, - orchestratorHandler, cancellationToken), - WorkItem.RequestOneofCase.ActivityRequest => ProcessActivityAsync(workItem.ActivityRequest, - activityHandler, cancellationToken), + WorkItem.RequestOneofCase.OrchestratorRequest => Task.Run( + () => ProcessWorkflowAsync(workItem.OrchestratorRequest, orchestratorHandler, cancellationToken), + cancellationToken), + WorkItem.RequestOneofCase.ActivityRequest => Task.Run( + () => ProcessActivityAsync(workItem.ActivityRequest, activityHandler, cancellationToken), + cancellationToken), _ => Task.Run( () => _logger.LogGrpcProtocolHandlerUnknownWorkItemType(workItem.RequestCase), cancellationToken) diff --git a/test/Dapr.IntegrationTest.Workflow/ActivitySleepTests.cs b/test/Dapr.IntegrationTest.Workflow/ActivitySleepTests.cs new file mode 100644 index 000000000..8b2cf7a99 --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/ActivitySleepTests.cs @@ -0,0 +1,101 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Dapr.Workflow; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.IntegrationTest.Workflow; + +public sealed class ActivitySleepTests +{ + [Fact] + public async Task ShouldHandleActivitySleep() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId1 = Guid.NewGuid().ToString(); + var workflowInstanceId2 = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + configureRuntime: opt => + { + opt.RegisterWorkflow(); + opt.RegisterWorkflow(); + opt.RegisterActivity(); + }, + configureClient: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + // Clean test logic + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + // Start the workflow + const int startingValue = 8; + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(Test1Workflow), workflowInstanceId1, startingValue); + + var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(Test2Workflow), workflowInstanceId2, startingValue, null, cts1.Token); + + var state = await daprWorkflowClient.GetWorkflowStateAsync(workflowInstanceId1); + Assert.NotNull(state); + Assert.Equal(WorkflowRuntimeStatus.Running, state.RuntimeStatus); + + var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(30)); + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId2, true, cts2.Token); + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var resultValue = result.ReadOutputAs(); + Assert.Equal(9, resultValue); + } + + private sealed class SleepActivity : WorkflowActivity + { + public override Task RunAsync(WorkflowActivityContext context, int input) + { + Thread.Sleep(int.MaxValue); + return Task.FromResult(input); + } + } + + private sealed class Test1Workflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, int input) + { + await context.CallActivityAsync(nameof(SleepActivity), input); + return 0; + } + } + + private sealed class Test2Workflow : Workflow + { + public override Task RunAsync(WorkflowContext context, int input) + { + return Task.FromResult(input + 1); + } + } +}