diff --git a/test/Dapr.IntegrationTest.Workflow/ExternalInputWorkflowTests.cs b/test/Dapr.IntegrationTest.Workflow/ExternalInputWorkflowTests.cs index 1c79e6272..0d272520b 100644 --- a/test/Dapr.IntegrationTest.Workflow/ExternalInputWorkflowTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/ExternalInputWorkflowTests.cs @@ -29,7 +29,7 @@ public sealed partial class ExternalInputWorkflowTests new("Cars", 15000, 100), new("Computers", 500, 100) ]; - + [Fact] public async Task ShouldHandleMultipleExternalEvents_Simple() { @@ -72,14 +72,14 @@ public async Task ShouldHandleMultipleExternalEvents_Simple() var output = result.ReadOutputAs(); Assert.Equal("FirstData-42-True", output); } - + [Fact] public async Task ShouldHandleStandardWorkflowsWithDependencyInjection() { var options = new DaprRuntimeOptions(); var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); var workflowInstanceId = Guid.NewGuid().ToString(); - + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); await using var testApp = await DaprHarnessBuilder.ForHarness(harness) .ConfigureServices(builder => @@ -95,7 +95,7 @@ public async Task ShouldHandleStandardWorkflowsWithDependencyInjection() if (!string.IsNullOrEmpty(grpcEndpoint)) b.UseGrpcEndpoint(grpcEndpoint); }); - + // Register the Dapr Workflow client builder.Services.AddDaprWorkflowBuilder( configureRuntime: opt => @@ -119,7 +119,7 @@ public async Task ShouldHandleStandardWorkflowsWithDependencyInjection() // Clean test logic using var scope = testApp.CreateScope(); - + // Set up the base inventory in the Dapr state management store var daprClient = scope.ServiceProvider.GetRequiredService(); foreach (var baseInventoryItem in BaseInventory) @@ -127,9 +127,9 @@ public async Task ShouldHandleStandardWorkflowsWithDependencyInjection() await daprClient.SaveStateAsync(TestContainers.Constants.DaprComponentNames.StateManagementComponentName, baseInventoryItem.Name.ToLowerInvariant(), baseInventoryItem); } - + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); - + // Create an order under the threshold const string itemName = "Computers"; const int amount = 3; @@ -137,12 +137,12 @@ await daprClient.SaveStateAsync(TestContainers.Constants.DaprComponentNames.Stat string.Equals(item.Name, itemName, StringComparison.OrdinalIgnoreCase)); var totalCost = amount * item.PerItemCost; var orderInfo = new OrderPayload(itemName.ToLowerInvariant(), totalCost, amount); - + // Start the workflow await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(OrderProcessingWorkflow), workflowInstanceId, orderInfo); - - + + // Wait for the workflow to complete - it shouldn't ask for approval var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); @@ -151,6 +151,116 @@ await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(OrderProcessingWorkflow Assert.True(resultValue.Processed); } + [Fact] + public async Task ShouldHandleExternalEventTimeout() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + configureRuntime: opt => + { + opt.RegisterWorkflow(); + }, + configureClient: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(TimeoutWorkflow), workflowInstanceId); + + // Don't send event, let it timeout + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var output = result.ReadOutputAs(); + Assert.Equal("Timeout", output); + } + + [Fact] + public async Task ShouldHandleExternalEventWithDefaultValue() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + configureRuntime: opt => + { + opt.RegisterWorkflow(); + }, + configureClient: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(DefaultValueWorkflow), workflowInstanceId); + await Task.Delay(TimeSpan.FromSeconds(2)); + + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var output = result.ReadOutputAs(); + Assert.Equal(42, output); // Default value + } + + private sealed class TimeoutWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, object? input) + { + try + { + await context.WaitForExternalEventAsync("ApprovalEvent", TimeSpan.FromSeconds(5)); + return "Received"; + } + catch (TaskCanceledException) + { + return "Timeout"; + } + } + } + + private sealed class DefaultValueWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, object? input) + { + try + { + var value = await context.WaitForExternalEventAsync("ValueEvent", TimeSpan.FromSeconds(5)); + return value; + } + catch (TaskCanceledException) + { + return 42; // Default value + } + } + } + public sealed record Notification(string Message); public sealed record OrderPayload(string Name, double TotalCost, int Quantity); @@ -160,15 +270,18 @@ public sealed record InventoryRequest(string RequestId, string ItemName, int Qua public sealed record InventoryResult(bool Success, InventoryItem? Item); public sealed record PaymentRequest(string RequestId, string ItemName, int Amount, double Currency); + public sealed record InventoryItem(string Name, double PerItemCost, int Quantity); + public sealed record OrderResult(bool Processed); + public enum ApprovalResult { Unspecified = 0, Approved = 1, Rejected = 2 } - + [Fact] public async Task ShouldHandleMultipleExternalEvents() { @@ -315,7 +428,7 @@ await context.CallActivityAsync( try { // There is enough inventory available so the user can purchase the item(s). Process their payment - + await context.CallActivityAsync( nameof(UpdateInventoryActivity), new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost), @@ -342,12 +455,14 @@ await context.CallActivityAsync( } [LoggerMessage(LogLevel.Information, "Received order '{OrderId}' for {Quantity} of {ItemName} at ${TotalCost}")] - private static partial void LogReceivedOrder(ILogger logger, string orderId, int quantity, string itemName, double totalCost); + private static partial void LogReceivedOrder(ILogger logger, string orderId, int quantity, string itemName, + double totalCost); [LoggerMessage(LogLevel.Error, "Insufficient inventory for '{OrderName}'")] private static partial void LogInsufficientInventory(ILogger logger, string orderName); - [LoggerMessage(LogLevel.Information, "Requesting manager approval since total cost {TotalCost} exceeds threshold {Threshold}")] + [LoggerMessage(LogLevel.Information, + "Requesting manager approval since total cost {TotalCost} exceeds threshold {Threshold}")] private static partial void LogRequestingApproval(ILogger logger, double totalCost, double threshold); [LoggerMessage(LogLevel.Information, "Approval result: {ApprovalResult}")] @@ -355,10 +470,10 @@ await context.CallActivityAsync( [LoggerMessage(LogLevel.Information, "Processing payment as sufficient inventory is available")] private static partial void LogProcessingPayment(ILogger logger); - + [LoggerMessage(LogLevel.Error, "Cancelling order because it didn't receive an approval")] private static partial void LogCancelingOrder(ILogger logger); - + [LoggerMessage(LogLevel.Error, "Order {OrderId} failed! Details: {ErrorMessage}")] private static partial void LogOrderFailed(ILogger logger, string orderId, string errorMessage); @@ -366,27 +481,33 @@ await context.CallActivityAsync( private static partial void LogOrderCompleted(ILogger logger, string orderId); } - public sealed partial class UpdateInventoryActivity(ILogger logger, DaprClient daprClient) : WorkflowActivity + public sealed partial class UpdateInventoryActivity(ILogger logger, DaprClient daprClient) + : WorkflowActivity { public override async Task RunAsync(WorkflowActivityContext context, PaymentRequest request) { LogInventoryCheck(request.RequestId, request.Amount, request.ItemName); - + // Simulate slow processing await Task.Delay((TimeSpan.FromSeconds(5))); - + // Determine if there are enough items for purchase - var item = await daprClient.GetStateAsync(TestContainers.Constants.DaprComponentNames.StateManagementComponentName, request.ItemName.ToLowerInvariant()); + var item = await daprClient.GetStateAsync( + TestContainers.Constants.DaprComponentNames.StateManagementComponentName, + request.ItemName.ToLowerInvariant()); var newQuantity = item.Quantity - request.Amount; if (newQuantity < 0) { LogInsufficientInventory(request.RequestId, request.Amount, item.Quantity, request.ItemName); - throw new InvalidOperationException($"Not enough '{request.ItemName}' inventory! Requested {request.Amount} but only {item.Quantity} available."); + throw new InvalidOperationException( + $"Not enough '{request.ItemName}' inventory! Requested {request.Amount} but only {item.Quantity} available."); } - + // Update the state store with the new amount of the item - await daprClient.SaveStateAsync(TestContainers.Constants.DaprComponentNames.StateManagementComponentName, request.ItemName.ToLowerInvariant(), new InventoryItem(request.ItemName, item.PerItemCost, newQuantity)); - + await daprClient.SaveStateAsync(TestContainers.Constants.DaprComponentNames.StateManagementComponentName, + request.ItemName.ToLowerInvariant(), + new InventoryItem(request.ItemName, item.PerItemCost, newQuantity)); + LogUpdatedInventory(newQuantity, item.Name); return null; } @@ -409,31 +530,36 @@ public sealed partial class ProcessPaymentActivity(ILogger RunAsync(WorkflowActivityContext context, PaymentRequest input) { LogProcessing(input.RequestId, input.Amount, input.ItemName, input.Currency); - + // Simulate slow processing await Task.Delay(TimeSpan.FromSeconds(7)); - + LogProcessingSuccessful(input.RequestId); return null; } - [LoggerMessage(LogLevel.Information, "Processing payment for order {RequestId} for {Amount} of {ItemName} at ${Currency}")] + [LoggerMessage(LogLevel.Information, + "Processing payment for order {RequestId} for {Amount} of {ItemName} at ${Currency}")] private partial void LogProcessing(string requestId, double amount, string itemName, double currency); - + [LoggerMessage(LogLevel.Information, "Payment for request ID '{RequestId}' processed successfully")] private partial void LogProcessingSuccessful(string requestId); } - public sealed partial class ReserveInventoryActivity(ILogger logger, DaprClient daprClient) + public sealed partial class ReserveInventoryActivity( + ILogger logger, + DaprClient daprClient) : WorkflowActivity { public override async Task RunAsync(WorkflowActivityContext context, InventoryRequest req) { LogReservation(req.RequestId, req.Quantity, req.ItemName); - + // Ensure that the store has items - var item = await daprClient.GetStateAsync(TestContainers.Constants.DaprComponentNames.StateManagementComponentName, req.ItemName.ToLowerInvariant()); - + var item = await daprClient.GetStateAsync( + TestContainers.Constants.DaprComponentNames.StateManagementComponentName, + req.ItemName.ToLowerInvariant()); + // Catch for the case where the statestore isn't set up if (item == null) { @@ -442,7 +568,7 @@ public override async Task RunAsync(WorkflowActivityContext con } LogAvailability(item.Quantity, item.Name); - + // See if there are enough items to purchase if (item.Quantity >= req.Quantity) { @@ -450,7 +576,7 @@ public override async Task RunAsync(WorkflowActivityContext con await Task.Delay(TimeSpan.FromSeconds(2)); return new InventoryResult(true, item); } - + // Not enough items return new InventoryResult(false, item); } @@ -461,7 +587,7 @@ public override async Task RunAsync(WorkflowActivityContext con [LoggerMessage(LogLevel.Information, "There are {Quantity} {ItemName} available for purchase")] private partial void LogAvailability(int Quantity, string ItemName); } - + public sealed partial class RequestApprovalActivity(ILogger logger) : WorkflowActivity { @@ -472,7 +598,7 @@ public sealed partial class RequestApprovalActivity(ILogger(null); } - [LoggerMessage(LogLevel.Information, "Requesting approval for order {Orderid}" )] + [LoggerMessage(LogLevel.Information, "Requesting approval for order {Orderid}")] private partial void LogApprovalRequest(string orderId); } diff --git a/test/Dapr.IntegrationTest.Workflow/ReplaySafetyTests.cs b/test/Dapr.IntegrationTest.Workflow/ReplaySafetyTests.cs new file mode 100644 index 000000000..7c8963d6d --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/ReplaySafetyTests.cs @@ -0,0 +1,149 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.TestContainers.Common; +using Dapr.TestContainers.Common.Options; +using Dapr.Workflow; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Dapr.IntegrationTest.Workflow; + +public sealed partial class ReplaySafetyTests +{ + [Fact] + public async Task ReplaySafeLogger_ShouldNotDuplicateLogsOnReplay() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + configureRuntime: opt => + { + opt.RegisterWorkflow(); + opt.RegisterActivity(); + }, + configureClient: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(LoggingWorkflow), workflowInstanceId, "test"); + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var output = result.ReadOutputAs(); + Assert.Equal("Completed", output); + } + + [Fact] + public async Task Workflow_ShouldUseDeterministicGuidGeneration() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + configureRuntime: opt => + { + opt.RegisterWorkflow(); + }, + configureClient: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(DeterministicGuidWorkflow), workflowInstanceId); + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId); + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var guids = result.ReadOutputAs(); + Assert.NotNull(guids); + Assert.Equal(3, guids.Length); + + // All GUIDs should be different but deterministic + Assert.Equal(guids.Length, guids.Distinct().Count()); + Assert.All(guids, g => Assert.NotEqual(Guid.Empty, g)); + } + + private sealed partial class LoggingWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, string input) + { + var logger = context.CreateReplaySafeLogger(); + + LogWorkflowStarted(logger, context.InstanceId); + + await context.CallActivityAsync(nameof(SimpleActivity), input); + + LogWorkflowCompleted(logger, context.InstanceId); + + return "Completed"; + } + + [LoggerMessage(LogLevel.Information, "Workflow {InstanceId} started")] + private static partial void LogWorkflowStarted(ILogger logger, string instanceId); + + [LoggerMessage(LogLevel.Information, "Workflow {InstanceId} completed")] + private static partial void LogWorkflowCompleted(ILogger logger, string instanceId); + } + + private sealed class DeterministicGuidWorkflow : Workflow + { + public override Task RunAsync(WorkflowContext context, object? input) + { + var guids = new[] + { + context.NewGuid(), + context.NewGuid(), + context.NewGuid() + }; + + return Task.FromResult(guids); + } + } + + private sealed class SimpleActivity : WorkflowActivity + { + public override Task RunAsync(WorkflowActivityContext context, string input) + { + return Task.FromResult($"Processed: {input}"); + } + } +}