Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 162 additions & 36 deletions test/Dapr.IntegrationTest.Workflow/ExternalInputWorkflowTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public sealed partial class ExternalInputWorkflowTests
new("Cars", 15000, 100),
new("Computers", 500, 100)
];

[Fact]
public async Task ShouldHandleMultipleExternalEvents_Simple()
{
Expand Down Expand Up @@ -72,14 +72,14 @@ public async Task ShouldHandleMultipleExternalEvents_Simple()
var output = result.ReadOutputAs<string>();
Assert.Equal("FirstData-42-True", output);
}

[Fact]
public async Task ShouldHandleStandardWorkflowsWithDependencyInjection()
{
var options = new DaprRuntimeOptions();
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir);
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
Expand All @@ -95,7 +95,7 @@ public async Task ShouldHandleStandardWorkflowsWithDependencyInjection()
if (!string.IsNullOrEmpty(grpcEndpoint))
b.UseGrpcEndpoint(grpcEndpoint);
});

// Register the Dapr Workflow client
builder.Services.AddDaprWorkflowBuilder(
configureRuntime: opt =>
Expand All @@ -119,30 +119,30 @@ public async Task ShouldHandleStandardWorkflowsWithDependencyInjection()

// Clean test logic
using var scope = testApp.CreateScope();

// Set up the base inventory in the Dapr state management store
var daprClient = scope.ServiceProvider.GetRequiredService<DaprClient>();
foreach (var baseInventoryItem in BaseInventory)
{
await daprClient.SaveStateAsync(TestContainers.Constants.DaprComponentNames.StateManagementComponentName,
baseInventoryItem.Name.ToLowerInvariant(), baseInventoryItem);
}

var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

// Create an order under the threshold
const string itemName = "Computers";
const int amount = 3;
var item = BaseInventory.First(item =>
string.Equals(item.Name, itemName, StringComparison.OrdinalIgnoreCase));
var totalCost = amount * item.PerItemCost;
var orderInfo = new OrderPayload(itemName.ToLowerInvariant(), totalCost, amount);

// Start the workflow
await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(OrderProcessingWorkflow), workflowInstanceId,
orderInfo);


// Wait for the workflow to complete - it shouldn't ask for approval
var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId);
Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
Expand All @@ -151,6 +151,116 @@ await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(OrderProcessingWorkflow
Assert.True(resultValue.Processed);
}

[Fact]
public async Task ShouldHandleExternalEventTimeout()
{
var options = new DaprRuntimeOptions();
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir);
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
configureRuntime: opt =>
{
opt.RegisterWorkflow<TimeoutWorkflow>();
},
configureClient: (sp, clientBuilder) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
clientBuilder.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(TimeoutWorkflow), workflowInstanceId);

// Don't send event, let it timeout
var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId);

Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
var output = result.ReadOutputAs<string>();
Assert.Equal("Timeout", output);
}

[Fact]
public async Task ShouldHandleExternalEventWithDefaultValue()
{
var options = new DaprRuntimeOptions();
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir);
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
configureRuntime: opt =>
{
opt.RegisterWorkflow<DefaultValueWorkflow>();
},
configureClient: (sp, clientBuilder) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
clientBuilder.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(DefaultValueWorkflow), workflowInstanceId);
await Task.Delay(TimeSpan.FromSeconds(2));

var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId);

Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
var output = result.ReadOutputAs<int>();
Assert.Equal(42, output); // Default value
}

private sealed class TimeoutWorkflow : Workflow<object?, string>
{
public override async Task<string> RunAsync(WorkflowContext context, object? input)
{
try
{
await context.WaitForExternalEventAsync<string>("ApprovalEvent", TimeSpan.FromSeconds(5));
return "Received";
}
catch (TaskCanceledException)
{
return "Timeout";
}
}
}

private sealed class DefaultValueWorkflow : Workflow<object?, int>
{
public override async Task<int> RunAsync(WorkflowContext context, object? input)
{
try
{
var value = await context.WaitForExternalEventAsync<int>("ValueEvent", TimeSpan.FromSeconds(5));
return value;
}
catch (TaskCanceledException)
{
return 42; // Default value
}
}
}

public sealed record Notification(string Message);

public sealed record OrderPayload(string Name, double TotalCost, int Quantity);
Expand All @@ -160,15 +270,18 @@ public sealed record InventoryRequest(string RequestId, string ItemName, int Qua
public sealed record InventoryResult(bool Success, InventoryItem? Item);

public sealed record PaymentRequest(string RequestId, string ItemName, int Amount, double Currency);

public sealed record InventoryItem(string Name, double PerItemCost, int Quantity);

public sealed record OrderResult(bool Processed);

public enum ApprovalResult
{
Unspecified = 0,
Approved = 1,
Rejected = 2
}

[Fact]
public async Task ShouldHandleMultipleExternalEvents()
{
Expand Down Expand Up @@ -315,7 +428,7 @@ await context.CallActivityAsync(
try
{
// There is enough inventory available so the user can purchase the item(s). Process their payment

await context.CallActivityAsync(
nameof(UpdateInventoryActivity),
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost),
Expand All @@ -342,51 +455,59 @@ await context.CallActivityAsync(
}

[LoggerMessage(LogLevel.Information, "Received order '{OrderId}' for {Quantity} of {ItemName} at ${TotalCost}")]
private static partial void LogReceivedOrder(ILogger logger, string orderId, int quantity, string itemName, double totalCost);
private static partial void LogReceivedOrder(ILogger logger, string orderId, int quantity, string itemName,
double totalCost);

[LoggerMessage(LogLevel.Error, "Insufficient inventory for '{OrderName}'")]
private static partial void LogInsufficientInventory(ILogger logger, string orderName);

[LoggerMessage(LogLevel.Information, "Requesting manager approval since total cost {TotalCost} exceeds threshold {Threshold}")]
[LoggerMessage(LogLevel.Information,
"Requesting manager approval since total cost {TotalCost} exceeds threshold {Threshold}")]
private static partial void LogRequestingApproval(ILogger logger, double totalCost, double threshold);

[LoggerMessage(LogLevel.Information, "Approval result: {ApprovalResult}")]
private static partial void LogApprovalResult(ILogger logger, ApprovalResult approvalResult);

[LoggerMessage(LogLevel.Information, "Processing payment as sufficient inventory is available")]
private static partial void LogProcessingPayment(ILogger logger);

[LoggerMessage(LogLevel.Error, "Cancelling order because it didn't receive an approval")]
private static partial void LogCancelingOrder(ILogger logger);

[LoggerMessage(LogLevel.Error, "Order {OrderId} failed! Details: {ErrorMessage}")]
private static partial void LogOrderFailed(ILogger logger, string orderId, string errorMessage);

[LoggerMessage(LogLevel.Information, "Order {OrderId} has completed!")]
private static partial void LogOrderCompleted(ILogger logger, string orderId);
}

public sealed partial class UpdateInventoryActivity(ILogger<UpdateInventoryActivity> logger, DaprClient daprClient) : WorkflowActivity<PaymentRequest, object?>
public sealed partial class UpdateInventoryActivity(ILogger<UpdateInventoryActivity> logger, DaprClient daprClient)
: WorkflowActivity<PaymentRequest, object?>
{
public override async Task<object?> RunAsync(WorkflowActivityContext context, PaymentRequest request)
{
LogInventoryCheck(request.RequestId, request.Amount, request.ItemName);

// Simulate slow processing
await Task.Delay((TimeSpan.FromSeconds(5)));

// Determine if there are enough items for purchase
var item = await daprClient.GetStateAsync<InventoryItem>(TestContainers.Constants.DaprComponentNames.StateManagementComponentName, request.ItemName.ToLowerInvariant());
var item = await daprClient.GetStateAsync<InventoryItem>(
TestContainers.Constants.DaprComponentNames.StateManagementComponentName,
request.ItemName.ToLowerInvariant());
var newQuantity = item.Quantity - request.Amount;
if (newQuantity < 0)
{
LogInsufficientInventory(request.RequestId, request.Amount, item.Quantity, request.ItemName);
throw new InvalidOperationException($"Not enough '{request.ItemName}' inventory! Requested {request.Amount} but only {item.Quantity} available.");
throw new InvalidOperationException(
$"Not enough '{request.ItemName}' inventory! Requested {request.Amount} but only {item.Quantity} available.");
}

// Update the state store with the new amount of the item
await daprClient.SaveStateAsync(TestContainers.Constants.DaprComponentNames.StateManagementComponentName, request.ItemName.ToLowerInvariant(), new InventoryItem(request.ItemName, item.PerItemCost, newQuantity));

await daprClient.SaveStateAsync(TestContainers.Constants.DaprComponentNames.StateManagementComponentName,
request.ItemName.ToLowerInvariant(),
new InventoryItem(request.ItemName, item.PerItemCost, newQuantity));

LogUpdatedInventory(newQuantity, item.Name);
return null;
}
Expand All @@ -409,31 +530,36 @@ public sealed partial class ProcessPaymentActivity(ILogger<ProcessPaymentActivit
public override async Task<object?> RunAsync(WorkflowActivityContext context, PaymentRequest input)
{
LogProcessing(input.RequestId, input.Amount, input.ItemName, input.Currency);

// Simulate slow processing
await Task.Delay(TimeSpan.FromSeconds(7));

LogProcessingSuccessful(input.RequestId);
return null;
}

[LoggerMessage(LogLevel.Information, "Processing payment for order {RequestId} for {Amount} of {ItemName} at ${Currency}")]
[LoggerMessage(LogLevel.Information,
"Processing payment for order {RequestId} for {Amount} of {ItemName} at ${Currency}")]
private partial void LogProcessing(string requestId, double amount, string itemName, double currency);

[LoggerMessage(LogLevel.Information, "Payment for request ID '{RequestId}' processed successfully")]
private partial void LogProcessingSuccessful(string requestId);
}

public sealed partial class ReserveInventoryActivity(ILogger<ReserveInventoryActivity> logger, DaprClient daprClient)
public sealed partial class ReserveInventoryActivity(
ILogger<ReserveInventoryActivity> logger,
DaprClient daprClient)
: WorkflowActivity<InventoryRequest, InventoryResult>
{
public override async Task<InventoryResult> RunAsync(WorkflowActivityContext context, InventoryRequest req)
{
LogReservation(req.RequestId, req.Quantity, req.ItemName);

// Ensure that the store has items
var item = await daprClient.GetStateAsync<InventoryItem?>(TestContainers.Constants.DaprComponentNames.StateManagementComponentName, req.ItemName.ToLowerInvariant());

var item = await daprClient.GetStateAsync<InventoryItem?>(
TestContainers.Constants.DaprComponentNames.StateManagementComponentName,
req.ItemName.ToLowerInvariant());

// Catch for the case where the statestore isn't set up
if (item == null)
{
Expand All @@ -442,15 +568,15 @@ public override async Task<InventoryResult> RunAsync(WorkflowActivityContext con
}

LogAvailability(item.Quantity, item.Name);

// See if there are enough items to purchase
if (item.Quantity >= req.Quantity)
{
// Simulate slow processing
await Task.Delay(TimeSpan.FromSeconds(2));
return new InventoryResult(true, item);
}

// Not enough items
return new InventoryResult(false, item);
}
Expand All @@ -461,7 +587,7 @@ public override async Task<InventoryResult> RunAsync(WorkflowActivityContext con
[LoggerMessage(LogLevel.Information, "There are {Quantity} {ItemName} available for purchase")]
private partial void LogAvailability(int Quantity, string ItemName);
}

public sealed partial class RequestApprovalActivity(ILogger<RequestApprovalActivity> logger)
: WorkflowActivity<OrderPayload, object?>
{
Expand All @@ -472,7 +598,7 @@ public sealed partial class RequestApprovalActivity(ILogger<RequestApprovalActiv
return Task.FromResult<object?>(null);
}

[LoggerMessage(LogLevel.Information, "Requesting approval for order {Orderid}" )]
[LoggerMessage(LogLevel.Information, "Requesting approval for order {Orderid}")]
private partial void LogApprovalRequest(string orderId);
}

Expand Down
Loading