diff --git a/test/Dapr.IntegrationTest.Workflow/ContinueAsNewTests.cs b/test/Dapr.IntegrationTest.Workflow/ContinueAsNewTests.cs new file mode 100644 index 000000000..6066048f4 --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/ContinueAsNewTests.cs @@ -0,0 +1,69 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Dapr.Workflow; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.IntegrationTest.Workflow; + +public sealed class ContinueAsNewTests +{ + [Fact] + public async Task ShouldContinueAsNewUntilComplete() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => opt.RegisterWorkflow(), + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(CounterWorkflow), workflowInstanceId, 0); + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var finalCount = result.ReadOutputAs(); + Assert.Equal(5, finalCount); + } + + private sealed class CounterWorkflow : Workflow + { + public override Task RunAsync(WorkflowContext context, int currentCount) + { + if (currentCount < 5) + { + context.ContinueAsNew(currentCount + 1); + } + return Task.FromResult(currentCount); + } + } +} diff --git a/test/Dapr.IntegrationTest.Workflow/ErrorHandlingTests.cs b/test/Dapr.IntegrationTest.Workflow/ErrorHandlingTests.cs new file mode 100644 index 000000000..e6556c17b --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/ErrorHandlingTests.cs @@ -0,0 +1,152 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Dapr.Workflow; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.IntegrationTest.Workflow; + +public sealed class ErrorHandlingTests +{ + [Fact] + public async Task ShouldRetryFailedActivityAndSucceed() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => + { + opt.RegisterWorkflow(); + opt.RegisterActivity(); + }, + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(RetryWorkflow), workflowInstanceId, 0); + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var output = result.ReadOutputAs(); + Assert.Equal("Success after retries", output); + } + + [Fact] + public async Task ShouldCancelTimerOnExternalEvent() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => opt.RegisterWorkflow(), + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + var startTime = DateTime.UtcNow; + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(CancellableTimerWorkflow), workflowInstanceId); + await Task.Delay(TimeSpan.FromSeconds(2)); + + // Cancel the timer by raising an event + await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "CancelTimer"); + + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + var duration = DateTime.UtcNow - startTime; + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var output = result.ReadOutputAs(); + Assert.Equal("Cancelled", output); + + // Verify it completed quickly (not after full timer) + Assert.True(duration.TotalSeconds < 10, "Timer should have been cancelled early"); + } + + private sealed class CancellableTimerWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, object? input) + { + var timerTask = context.CreateTimer(TimeSpan.FromMinutes(5)); + var eventTask = context.WaitForExternalEventAsync("CancelTimer"); + + var completedTask = await Task.WhenAny(timerTask, eventTask); + + if (completedTask == eventTask) + { + return "Cancelled"; + } + + return "Completed"; + } + } + + private sealed class RetryWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, int attemptNumber) + { + try + { + await context.CallActivityAsync(nameof(FlakyActivity), attemptNumber); + return "Success after retries"; + } + catch (Exception) when (attemptNumber < 2) + { + // Retry by continuing the workflow with incremented attempt + context.ContinueAsNew(attemptNumber + 1); + return string.Empty; // Won't be reached + } + } + } + + private sealed class FlakyActivity : WorkflowActivity + { + public override Task RunAsync(WorkflowActivityContext context, int attemptCount) + { + if (attemptCount < 2) + throw new InvalidOperationException("Simulated failure"); + + return Task.FromResult("Success"); + } + } +} diff --git a/test/Dapr.IntegrationTest.Workflow/ExternalInputWorkflowTests.cs b/test/Dapr.IntegrationTest.Workflow/ExternalInputWorkflowTests.cs index 5a7f1fe06..1c79e6272 100644 --- a/test/Dapr.IntegrationTest.Workflow/ExternalInputWorkflowTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/ExternalInputWorkflowTests.cs @@ -23,13 +23,56 @@ namespace Dapr.IntegrationTest.Workflow; public sealed partial class ExternalInputWorkflowTests { - private List BaseInventory = + private readonly List BaseInventory = [ new("Paperclips", 5, 100), new("Cars", 15000, 100), new("Computers", 500, 100) ]; + [Fact] + public async Task ShouldHandleMultipleExternalEvents_Simple() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + configureRuntime: opt => + { + opt.RegisterWorkflow(); + }, + configureClient: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(MultiEventWorkflow), workflowInstanceId); + await Task.Delay(TimeSpan.FromSeconds(2)); + + // Raise multiple events + await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event1", "FirstData"); + await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event2", 42); + await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event3", true); + + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var output = result.ReadOutputAs(); + Assert.Equal("FirstData-42-True", output); + } + [Fact] public async Task ShouldHandleStandardWorkflowsWithDependencyInjection() { @@ -125,6 +168,61 @@ public enum ApprovalResult Approved = 1, Rejected = 2 } + + [Fact] + public async Task ShouldHandleMultipleExternalEvents() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + configureRuntime: opt => + { + opt.RegisterWorkflow(); + }, + configureClient: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(MultiEventWorkflow), workflowInstanceId); + await Task.Delay(TimeSpan.FromSeconds(2)); + + // Raise multiple events + await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event1", "FirstData"); + await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event2", 42); + await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event3", true); + + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var output = result.ReadOutputAs(); + Assert.Equal("FirstData-42-True", output); + } + + private sealed class MultiEventWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, object? input) + { + var event1 = await context.WaitForExternalEventAsync("Event1"); + var event2 = await context.WaitForExternalEventAsync("Event2"); + var event3 = await context.WaitForExternalEventAsync("Event3"); + + return $"{event1}-{event2}-{event3}"; + } + } internal sealed partial class OrderProcessingWorkflow : Workflow { diff --git a/test/Dapr.IntegrationTest.Workflow/ParallelExecutionTests.cs b/test/Dapr.IntegrationTest.Workflow/ParallelExecutionTests.cs new file mode 100644 index 000000000..3e62d1035 --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/ParallelExecutionTests.cs @@ -0,0 +1,150 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Dapr.Workflow; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.IntegrationTest.Workflow; + +public sealed class ParallelExecutionTests +{ + [Fact] + public async Task ShouldExecuteActivitiesInParallel() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => + { + opt.RegisterWorkflow(); + opt.RegisterActivity(); + }, + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + var startTime = DateTime.UtcNow; + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(ParallelWorkflow), workflowInstanceId); + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + var endTime = DateTime.UtcNow; + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var outputs = result.ReadOutputAs>(); + Assert.NotNull(outputs); + Assert.Equal(3, outputs.Count); + + // Should complete in ~2 seconds (parallel) not 6 seconds (sequential) + var duration = endTime - startTime; + Assert.True(duration.TotalSeconds < 5, $"Expected parallel execution to complete in < 5 seconds, took {duration.TotalSeconds}"); + } + + [Fact] + public async Task ShouldFanOutFanInWithActivities() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => + { + opt.RegisterWorkflow(); + opt.RegisterActivity(); + }, + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(FanOutFanInWorkflow), workflowInstanceId, (int[])[1, 2, 3, 4 + ]); + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var sum = result.ReadOutputAs(); + Assert.Equal(30, sum); // 1² + 2² + 3² + 4² = 1 + 4 + 9 + 16 = 30 + } + + private sealed class ParallelWorkflow : Workflow> + { + public override async Task> RunAsync(WorkflowContext context, object? input) + { + var tasks = new List> + { + context.CallActivityAsync(nameof(DelayedActivity), "Task1"), + context.CallActivityAsync(nameof(DelayedActivity), "Task2"), + context.CallActivityAsync(nameof(DelayedActivity), "Task3") + }; + + var results = await Task.WhenAll(tasks); + return results.ToList(); + } + } + + private sealed class FanOutFanInWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, int[] input) + { + var tasks = input.Select(num => context.CallActivityAsync(nameof(SquareActivity), num)).ToList(); + var results = await Task.WhenAll(tasks); + return results.Sum(); + } + } + + private sealed class DelayedActivity : WorkflowActivity + { + public override async Task RunAsync(WorkflowActivityContext context, string input) + { + await Task.Delay(TimeSpan.FromSeconds(2)); + return $"Completed: {input}"; + } + } + + private sealed class SquareActivity : WorkflowActivity + { + public override Task RunAsync(WorkflowActivityContext context, int input) + { + return Task.FromResult(input * input); + } + } +} diff --git a/test/Dapr.IntegrationTest.Workflow/PauseResumeTests.cs b/test/Dapr.IntegrationTest.Workflow/PauseResumeTests.cs new file mode 100644 index 000000000..45c2c4f9b --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/PauseResumeTests.cs @@ -0,0 +1,78 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Dapr.Workflow; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.IntegrationTest.Workflow; + +public sealed class PauseResumeTests +{ + [Fact] + public async Task ShouldPauseAndResumeWorkflow() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => opt.RegisterWorkflow(), + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(WaitingWorkflow), workflowInstanceId); + await Task.Delay(TimeSpan.FromSeconds(2)); + + // Pause the workflow + await daprWorkflowClient.SuspendWorkflowAsync(workflowInstanceId); + await Task.Delay(TimeSpan.FromSeconds(2)); + + var pausedState = await daprWorkflowClient.GetWorkflowStateAsync(workflowInstanceId); + Assert.NotNull(pausedState); + Assert.Equal(WorkflowRuntimeStatus.Suspended, pausedState.RuntimeStatus); + + // Resume the workflow + await daprWorkflowClient.ResumeWorkflowAsync(workflowInstanceId); + await Task.Delay(TimeSpan.FromSeconds(2)); + + var resumedState = await daprWorkflowClient.GetWorkflowStateAsync(workflowInstanceId); + Assert.NotNull(resumedState); + Assert.Equal(WorkflowRuntimeStatus.Running, resumedState.RuntimeStatus); + } + + private sealed class WaitingWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, object? input) + { + await context.CreateTimer(TimeSpan.FromMinutes(5)); + return "Completed"; + } + } +} diff --git a/test/Dapr.IntegrationTest.Workflow/PurgeTests.cs b/test/Dapr.IntegrationTest.Workflow/PurgeTests.cs new file mode 100644 index 000000000..500db6068 --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/PurgeTests.cs @@ -0,0 +1,73 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Dapr.Workflow; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.IntegrationTest.Workflow; + +public sealed class PurgeTests +{ + [Fact] + public async Task ShouldPurgeCompletedWorkflowInstance() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => opt.RegisterWorkflow(), + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(SimpleWorkflow), workflowInstanceId, "test"); + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + + // Verify workflow state exists before purge + var stateBeforePurge = await daprWorkflowClient.GetWorkflowStateAsync(workflowInstanceId); + Assert.NotNull(stateBeforePurge); + + // Purge the workflow instance + await daprWorkflowClient.PurgeInstanceAsync(workflowInstanceId); + + // Verify workflow state no longer exists after purge + var stateAfterPurge = await daprWorkflowClient.GetWorkflowStateAsync(workflowInstanceId); + Assert.Null(stateAfterPurge); + } + + private sealed class SimpleWorkflow : Workflow + { + public override Task RunAsync(WorkflowContext context, string input) + { + return Task.FromResult($"Processed: {input}"); + } + } +} diff --git a/test/Dapr.IntegrationTest.Workflow/SubworkflowTests.cs b/test/Dapr.IntegrationTest.Workflow/SubworkflowTests.cs index cf6ffe6d4..0c0af801c 100644 --- a/test/Dapr.IntegrationTest.Workflow/SubworkflowTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/SubworkflowTests.cs @@ -63,6 +63,79 @@ public async Task ShouldHandleSubworkflow() var subworkflowResultValue = subworkflowResult.ReadOutputAs(); Assert.True(subworkflowResultValue); } + + [Fact] + public async Task ShouldHandleMultipleParallelSubworkflows() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + configureRuntime: opt => + { + opt.RegisterWorkflow(); + opt.RegisterWorkflow(); + }, + configureClient: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrWhiteSpace(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync( + nameof(ParallelSubworkflowsWorkflow), + workflowInstanceId); + + var workflowResult = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + Assert.Equal(WorkflowRuntimeStatus.Completed, workflowResult.RuntimeStatus); + var results = workflowResult.ReadOutputAs(); + Assert.NotNull(results); + Assert.Equal(3, results.Length); + Assert.Equal([10, 20, 30], results); + } + + private sealed class ParallelSubworkflowsWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, object? input) + { + var tasks = new List>(); + + for (int i = 1; i <= 3; i++) + { + var subInstanceId = $"{context.InstanceId}-sub-{i}"; + var options = new ChildWorkflowTaskOptions(subInstanceId); + var task = context.CallChildWorkflowAsync( + nameof(ProcessingSubworkflow), + i * 10, + options); + tasks.Add(task); + } + + var results = await Task.WhenAll(tasks); + return results; + } + } + + private sealed class ProcessingSubworkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, int input) + { + await context.CreateTimer(TimeSpan.FromSeconds(2)); + return input; + } + } private sealed class DemoWorkflow : Workflow { diff --git a/test/Dapr.IntegrationTest.Workflow/TerminationTests.cs b/test/Dapr.IntegrationTest.Workflow/TerminationTests.cs new file mode 100644 index 000000000..0d3513cb3 --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/TerminationTests.cs @@ -0,0 +1,68 @@ +// // ------------------------------------------------------------------------ +// // Copyright 2025 The Dapr Authors +// // Licensed under the Apache License, Version 2.0 (the "License"); +// // you may not use this file except in compliance with the License. +// // You may obtain a copy of the License at +// // http://www.apache.org/licenses/LICENSE-2.0 +// // Unless required by applicable law or agreed to in writing, software +// // distributed under the License is distributed on an "AS IS" BASIS, +// // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// // See the License for the specific language governing permissions and +// // limitations under the License. +// // ------------------------------------------------------------------------ +// +// using Dapr.TestContainers.Common; +// using Dapr.TestContainers.Common.Options; +// using Dapr.Workflow; +// using Microsoft.Extensions.Configuration; +// using Microsoft.Extensions.DependencyInjection; +// +// namespace Dapr.IntegrationTest.Workflow; +// +// public sealed class TerminationTests +// { +// [Fact] +// public async Task ShouldTerminateRunningWorkflow() +// { +// var options = new DaprRuntimeOptions(); +// var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); +// var workflowInstanceId = Guid.NewGuid().ToString(); +// +// var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); +// await using var testApp = await DaprHarnessBuilder.ForHarness(harness) +// .ConfigureServices(builder => +// { +// builder.Services.AddDaprWorkflowBuilder( +// opt => opt.RegisterWorkflow(), +// configureClient: (sp, cb) => +// { +// var config = sp.GetRequiredService(); +// var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; +// if (!string.IsNullOrEmpty(grpcEndpoint)) +// cb.UseGrpcEndpoint(grpcEndpoint); +// }); +// }) +// .BuildAndStartAsync(); +// +// using var scope = testApp.CreateScope(); +// var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); +// +// await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(LongRunningWorkflow), workflowInstanceId); +// await Task.Delay(TimeSpan.FromSeconds(2)); +// +// await daprWorkflowClient.TerminateWorkflowAsync(workflowInstanceId, "User requested termination"); +// +// var state = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); +// Assert.NotNull(state); +// Assert.Equal(WorkflowRuntimeStatus.Terminated, state.RuntimeStatus); +// } +// +// private sealed class LongRunningWorkflow : Workflow +// { +// public override async Task RunAsync(WorkflowContext context, object? input) +// { +// await context.CreateTimer(TimeSpan.FromMinutes(10)); +// return "Completed"; +// } +// } +// } diff --git a/test/Dapr.IntegrationTest.Workflow/TimerTests.cs b/test/Dapr.IntegrationTest.Workflow/TimerTests.cs new file mode 100644 index 000000000..cbc06906f --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/TimerTests.cs @@ -0,0 +1,93 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Dapr.Workflow; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.IntegrationTest.Workflow; + +public sealed class TimerTests +{ + private const string InitialMessage = "Test1"; + private const string FinalMessage = "Test2"; + + [Fact] + public async Task ValidateStatusMessagesWithDelay() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => + { + opt.RegisterWorkflow(); + opt.RegisterActivity(); + }, + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(TestWorkflow), workflowInstanceId, 8); + await Task.Delay(TimeSpan.FromSeconds(3)); + + // Get the initial status + var initialStatus = await daprWorkflowClient.GetWorkflowStateAsync(workflowInstanceId); + Assert.NotNull(initialStatus); + Assert.Equal(WorkflowRuntimeStatus.Running, initialStatus.RuntimeStatus); + var initialStatusResult = initialStatus.ReadCustomStatusAs(); + Assert.Equal(InitialMessage, initialStatusResult); + + // Wait 20 seconds + await Task.Delay(TimeSpan.FromSeconds(20)); + + // Get the current status + var finalStatus = await daprWorkflowClient.GetWorkflowStateAsync(workflowInstanceId); + Assert.NotNull(finalStatus); + var finalStatusResult = finalStatus.ReadCustomStatusAs(); + Assert.Equal(FinalMessage, finalStatusResult); + Assert.Equal(WorkflowRuntimeStatus.Completed, finalStatus.RuntimeStatus); + } + + private sealed class TestWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, int input) + { + context.SetCustomStatus(InitialMessage); + await context.CreateTimer(TimeSpan.FromSeconds(15)); + context.SetCustomStatus(FinalMessage); + return await context.CallActivityAsync(nameof(TestActivity), input); + } + } + + private sealed class TestActivity : WorkflowActivity + { + public override Task RunAsync(WorkflowActivityContext context, int input) => Task.FromResult(input * 2); + } +}