Skip to content
69 changes: 69 additions & 0 deletions test/Dapr.IntegrationTest.Workflow/ContinueAsNewTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.TestContainers.Common;
using Dapr.TestContainers.Common.Options;
using Dapr.Workflow;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Dapr.IntegrationTest.Workflow;

public sealed class ContinueAsNewTests
{
[Fact]
public async Task ShouldContinueAsNewUntilComplete()
{
var options = new DaprRuntimeOptions();
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir);
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
opt => opt.RegisterWorkflow<CounterWorkflow>(),
configureClient: (sp, cb) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
cb.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(CounterWorkflow), workflowInstanceId, 0);
var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId);

Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
var finalCount = result.ReadOutputAs<int>();
Assert.Equal(5, finalCount);
}

private sealed class CounterWorkflow : Workflow<int, int>
{
public override Task<int> RunAsync(WorkflowContext context, int currentCount)
{
if (currentCount < 5)
{
context.ContinueAsNew(currentCount + 1);
}
return Task.FromResult(currentCount);
}
}
}
152 changes: 152 additions & 0 deletions test/Dapr.IntegrationTest.Workflow/ErrorHandlingTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.TestContainers.Common;
using Dapr.TestContainers.Common.Options;
using Dapr.Workflow;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Dapr.IntegrationTest.Workflow;

public sealed class ErrorHandlingTests
{
[Fact]
public async Task ShouldRetryFailedActivityAndSucceed()
{
var options = new DaprRuntimeOptions();
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir);
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
opt =>
{
opt.RegisterWorkflow<RetryWorkflow>();
opt.RegisterActivity<FlakyActivity>();
},
configureClient: (sp, cb) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
cb.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(RetryWorkflow), workflowInstanceId, 0);
var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId);

Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
var output = result.ReadOutputAs<string>();
Assert.Equal("Success after retries", output);
}

[Fact]
public async Task ShouldCancelTimerOnExternalEvent()
{
var options = new DaprRuntimeOptions();
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir);
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
opt => opt.RegisterWorkflow<CancellableTimerWorkflow>(),
configureClient: (sp, cb) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
cb.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

var startTime = DateTime.UtcNow;
await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(CancellableTimerWorkflow), workflowInstanceId);
await Task.Delay(TimeSpan.FromSeconds(2));

// Cancel the timer by raising an event
await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "CancelTimer");

var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId);
var duration = DateTime.UtcNow - startTime;

Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
var output = result.ReadOutputAs<string>();
Assert.Equal("Cancelled", output);

// Verify it completed quickly (not after full timer)
Assert.True(duration.TotalSeconds < 10, "Timer should have been cancelled early");
}

private sealed class CancellableTimerWorkflow : Workflow<object?, string>
{
public override async Task<string> RunAsync(WorkflowContext context, object? input)
{
var timerTask = context.CreateTimer(TimeSpan.FromMinutes(5));
var eventTask = context.WaitForExternalEventAsync<object>("CancelTimer");

var completedTask = await Task.WhenAny(timerTask, eventTask);

if (completedTask == eventTask)
{
return "Cancelled";
}

return "Completed";
}
}

private sealed class RetryWorkflow : Workflow<int, string>
{
public override async Task<string> RunAsync(WorkflowContext context, int attemptNumber)
{
try
{
await context.CallActivityAsync<string>(nameof(FlakyActivity), attemptNumber);
return "Success after retries";
}
catch (Exception) when (attemptNumber < 2)
{
// Retry by continuing the workflow with incremented attempt
context.ContinueAsNew(attemptNumber + 1);
return string.Empty; // Won't be reached
}
}
}

private sealed class FlakyActivity : WorkflowActivity<int, string>
{
public override Task<string> RunAsync(WorkflowActivityContext context, int attemptCount)
{
if (attemptCount < 2)
throw new InvalidOperationException("Simulated failure");

return Task.FromResult("Success");
}
}
}
100 changes: 99 additions & 1 deletion test/Dapr.IntegrationTest.Workflow/ExternalInputWorkflowTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,56 @@ namespace Dapr.IntegrationTest.Workflow;

public sealed partial class ExternalInputWorkflowTests
{
private List<InventoryItem> BaseInventory =
private readonly List<InventoryItem> BaseInventory =
[
new("Paperclips", 5, 100),
new("Cars", 15000, 100),
new("Computers", 500, 100)
];

[Fact]
public async Task ShouldHandleMultipleExternalEvents_Simple()
{
var options = new DaprRuntimeOptions();
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir);
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
configureRuntime: opt =>
{
opt.RegisterWorkflow<MultiEventWorkflow>();
},
configureClient: (sp, clientBuilder) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
clientBuilder.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(MultiEventWorkflow), workflowInstanceId);
await Task.Delay(TimeSpan.FromSeconds(2));

// Raise multiple events
await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event1", "FirstData");
await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event2", 42);
await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event3", true);

var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId);
Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
var output = result.ReadOutputAs<string>();
Assert.Equal("FirstData-42-True", output);
}

[Fact]
public async Task ShouldHandleStandardWorkflowsWithDependencyInjection()
{
Expand Down Expand Up @@ -125,6 +168,61 @@ public enum ApprovalResult
Approved = 1,
Rejected = 2
}

[Fact]
public async Task ShouldHandleMultipleExternalEvents()
{
var options = new DaprRuntimeOptions();
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

var harness = new DaprHarnessBuilder(options).BuildWorkflow(componentsDir);
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
configureRuntime: opt =>
{
opt.RegisterWorkflow<MultiEventWorkflow>();
},
configureClient: (sp, clientBuilder) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
clientBuilder.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(MultiEventWorkflow), workflowInstanceId);
await Task.Delay(TimeSpan.FromSeconds(2));

// Raise multiple events
await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event1", "FirstData");
await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event2", 42);
await daprWorkflowClient.RaiseEventAsync(workflowInstanceId, "Event3", true);

var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId);
Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
var output = result.ReadOutputAs<string>();
Assert.Equal("FirstData-42-True", output);
}

private sealed class MultiEventWorkflow : Workflow<object?, string>
{
public override async Task<string> RunAsync(WorkflowContext context, object? input)
{
var event1 = await context.WaitForExternalEventAsync<string>("Event1");
var event2 = await context.WaitForExternalEventAsync<int>("Event2");
var event3 = await context.WaitForExternalEventAsync<bool>("Event3");

return $"{event1}-{event2}-{event3}";
}
}

internal sealed partial class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
Expand Down
Loading