Skip to content

Commit

Permalink
Proper workflow retry support in Dapr SDK (#1090)
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Gillum <[email protected]>
Co-authored-by: halspang <[email protected]>
  • Loading branch information
cgillum and halspang authored May 19, 2023
1 parent a2d3c3a commit e59c856
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 37 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Threading.Tasks;
using Dapr.Client;
using Dapr.Client;
using Dapr.Workflow;
using Microsoft.Extensions.Logging;
using WorkflowConsoleApp.Models;
Expand Down Expand Up @@ -27,7 +26,7 @@ public override async Task<InventoryResult> RunAsync(WorkflowActivityContext con
req.ItemName);

// Ensure that the store has items
InventoryItem item = await client.GetStateAsync<InventoryItem>(
InventoryItem item = await this.client.GetStateAsync<InventoryItem>(
storeName,
req.ItemName.ToLowerInvariant());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Threading.Tasks;
using Dapr.Client;
using Dapr.Client;
using Dapr.Workflow;
using WorkflowConsoleApp.Models;
using Microsoft.Extensions.Logging;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ namespace WorkflowConsoleApp.Workflows
{
public class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
readonly WorkflowTaskOptions defaultActivityRetryOptions = new WorkflowTaskOptions
{
// NOTE: Beware that changing the number of retries is a breaking change for existing workflows.
RetryPolicy = new WorkflowRetryPolicy(
maxNumberOfAttempts: 3,
firstRetryInterval: TimeSpan.FromSeconds(5)),
};

public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
string orderId = context.InstanceId;


// Notify the user that an order has come through
await context.CallActivityAsync(
nameof(NotifyActivity),
Expand All @@ -19,7 +26,8 @@ await context.CallActivityAsync(
// Determine if there is enough of the item available for purchase by checking the inventory
InventoryResult result = await context.CallActivityAsync<InventoryResult>(
nameof(ReserveInventoryActivity),
new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
new InventoryRequest(RequestId: orderId, order.Name, order.Quantity),
this.defaultActivityRetryOptions);

// If there is insufficient inventory, fail and let the user know
if (!result.Success)
Expand All @@ -34,14 +42,16 @@ await context.CallActivityAsync(
// There is enough inventory available so the user can purchase the item(s). Process their payment
await context.CallActivityAsync(
nameof(ProcessPaymentActivity),
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost),
this.defaultActivityRetryOptions);

try
{
// There is enough inventory available so the user can purchase the item(s). Process their payment
await context.CallActivityAsync(
nameof(UpdateInventoryActivity),
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost),
this.defaultActivityRetryOptions);
}
catch (WorkflowTaskFailedException e)
{
Expand Down
17 changes: 8 additions & 9 deletions examples/Workflow/WorkflowUnitTest/OrderProcessingTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Threading.Tasks;
using Dapr.Workflow;
using Microsoft.DurableTask;
using Moq;
using WorkflowConsoleApp.Activities;
using WorkflowConsoleApp.Models;
Expand All @@ -24,7 +23,7 @@ public async Task TestSuccessfulOrder()
// Mock the call to ReserveInventoryActivity
Mock<WorkflowContext> mockContext = new();
mockContext
.Setup(ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), It.IsAny<InventoryRequest>(), It.IsAny<TaskOptions>()))
.Setup(ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), It.IsAny<InventoryRequest>(), It.IsAny<WorkflowTaskOptions>()))
.Returns(Task.FromResult(inventoryResult));

// Run the workflow directly
Expand All @@ -36,17 +35,17 @@ public async Task TestSuccessfulOrder()

// Verify that ReserveInventoryActivity was called with a specific input
mockContext.Verify(
ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), expectedInventoryRequest, It.IsAny<TaskOptions>()),
ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), expectedInventoryRequest, It.IsAny<WorkflowTaskOptions>()),
Times.Once());

// Verify that ProcessPaymentActivity was called with a specific input
mockContext.Verify(
ctx => ctx.CallActivityAsync(nameof(ProcessPaymentActivity), expectedPaymentRequest, It.IsAny<TaskOptions>()),
ctx => ctx.CallActivityAsync(nameof(ProcessPaymentActivity), expectedPaymentRequest, It.IsAny<WorkflowTaskOptions>()),
Times.Once());

// Verify that there were two calls to NotifyActivity
mockContext.Verify(
ctx => ctx.CallActivityAsync(nameof(NotifyActivity), It.IsAny<Notification>(), It.IsAny<TaskOptions>()),
ctx => ctx.CallActivityAsync(nameof(NotifyActivity), It.IsAny<Notification>(), It.IsAny<WorkflowTaskOptions>()),
Times.Exactly(2));
}

Expand All @@ -61,25 +60,25 @@ public async Task TestInsufficientInventory()
// Mock the call to ReserveInventoryActivity
Mock<WorkflowContext> mockContext = new();
mockContext
.Setup(ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), It.IsAny<InventoryRequest>(), It.IsAny<TaskOptions>()))
.Setup(ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), It.IsAny<InventoryRequest>(), It.IsAny<WorkflowTaskOptions>()))
.Returns(Task.FromResult(inventoryResult));

// Run the workflow directly
OrderResult result = await new OrderProcessingWorkflow().RunAsync(mockContext.Object, order);

// Verify that ReserveInventoryActivity was called with a specific input
mockContext.Verify(
ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), expectedInventoryRequest, It.IsAny<TaskOptions>()),
ctx => ctx.CallActivityAsync<InventoryResult>(nameof(ReserveInventoryActivity), expectedInventoryRequest, It.IsAny<WorkflowTaskOptions>()),
Times.Once());

// Verify that ProcessPaymentActivity was never called
mockContext.Verify(
ctx => ctx.CallActivityAsync(nameof(ProcessPaymentActivity), It.IsAny<PaymentRequest>(), It.IsAny<TaskOptions>()),
ctx => ctx.CallActivityAsync(nameof(ProcessPaymentActivity), It.IsAny<PaymentRequest>(), It.IsAny<WorkflowTaskOptions>()),
Times.Never());

// Verify that there were two calls to NotifyActivity
mockContext.Verify(
ctx => ctx.CallActivityAsync(nameof(NotifyActivity), It.IsAny<Notification>(), It.IsAny<TaskOptions>()),
ctx => ctx.CallActivityAsync(nameof(NotifyActivity), It.IsAny<Notification>(), It.IsAny<WorkflowTaskOptions>()),
Times.Exactly(2));
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/Dapr.Workflow/DaprWorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ internal DaprWorkflowContext(TaskOrchestrationContext innerContext)

public override bool IsReplaying => this.innerContext.IsReplaying;

public override Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null)
public override Task CallActivityAsync(string name, object? input = null, WorkflowTaskOptions? options = null)
{
return WrapExceptions(this.innerContext.CallActivityAsync(name, input, options));
return WrapExceptions(this.innerContext.CallActivityAsync(name, input, options?.ToDurableTaskOptions()));
}

public override Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptions? options = null)
public override Task<T> CallActivityAsync<T>(string name, object? input = null, WorkflowTaskOptions? options = null)
{
return WrapExceptions(this.innerContext.CallActivityAsync<T>(name, input, options));
return WrapExceptions(this.innerContext.CallActivityAsync<T>(name, input, options?.ToDurableTaskOptions()));
}

public override Task CreateTimer(TimeSpan delay, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -75,14 +75,14 @@ public override void SetCustomStatus(object? customStatus)
this.innerContext.SetCustomStatus(customStatus);
}

public override Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, TaskOptions? options = null)
public override Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, ChildWorkflowTaskOptions? options = null)
{
return WrapExceptions(this.innerContext.CallSubOrchestratorAsync<TResult>(workflowName, input, options));
return WrapExceptions(this.innerContext.CallSubOrchestratorAsync<TResult>(workflowName, input, options?.ToDurableTaskOptions()));
}

public override Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null)
public override Task CallChildWorkflowAsync(string workflowName, object? input = null, ChildWorkflowTaskOptions? options = null)
{
return WrapExceptions(this.innerContext.CallSubOrchestratorAsync(workflowName, input, options));
return WrapExceptions(this.innerContext.CallSubOrchestratorAsync(workflowName, input, options?.ToDurableTaskOptions()));
}

public override void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true)
Expand Down
27 changes: 16 additions & 11 deletions src/Dapr.Workflow/WorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ namespace Dapr.Workflow
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.DurableTask;

/// <summary>
/// Context object used by workflow implementations to perform actions such as scheduling activities, durable timers, waiting for
Expand Down Expand Up @@ -101,7 +100,7 @@ public abstract class WorkflowContext
/// The activity failed with an unhandled exception. The details of the failure can be found in the
/// <see cref="WorkflowTaskFailedException.FailureDetails"/> property.
/// </exception>
public virtual Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null)
public virtual Task CallActivityAsync(string name, object? input = null, WorkflowTaskOptions? options = null)
{
return this.CallActivityAsync<object>(name, input, options);
}
Expand All @@ -110,7 +109,7 @@ public virtual Task CallActivityAsync(string name, object? input = null, TaskOpt
/// A task that completes when the activity completes or fails. The result of the task is the activity's return value.
/// </returns>
/// <inheritdoc cref="CallActivityAsync"/>
public abstract Task<T> CallActivityAsync<T>(string name, object? input = null, TaskOptions? options = null);
public abstract Task<T> CallActivityAsync<T>(string name, object? input = null, WorkflowTaskOptions? options = null);

/// <summary>
/// Creates a durable timer that expires after the specified delay.
Expand Down Expand Up @@ -212,8 +211,11 @@ public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationTo
/// <typeparam name="TResult">
/// The type into which to deserialize the child workflow's output.
/// </typeparam>
/// <inheritdoc cref="CallChildWorkflowAsync(string, object?, TaskOptions?)"/>
public abstract Task<TResult> CallChildWorkflowAsync<TResult>(string workflowName, object? input = null, TaskOptions? options = null);
/// <inheritdoc cref="CallChildWorkflowAsync(string, object?, ChildWorkflowTaskOptions?)"/>
public abstract Task<TResult> CallChildWorkflowAsync<TResult>(
string workflowName,
object? input = null,
ChildWorkflowTaskOptions? options = null);

/// <summary>
/// Executes the specified workflow as a child workflow.
Expand All @@ -222,7 +224,8 @@ public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationTo
/// <para>
/// In addition to activities, workflows can schedule other workflows as <i>child workflows</i>.
/// A child workflow has its own instance ID, history, and status that is independent of the parent workflow
/// that started it.
/// that started it. You can use <see cref="ChildWorkflowTaskOptions.InstanceId" /> to specify an instance ID
/// for the child workflow. Otherwise, the instance ID will be randomly generated.
/// </para><para>
/// Child workflows have many benefits:
/// <list type="bullet">
Expand All @@ -237,15 +240,14 @@ public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationTo
/// exception. Child workflows also support automatic retry policies.
/// </para><para>
/// Because child workflows are independent of their parents, terminating a parent workflow does not affect
/// any child workflows. You must terminate each child workflow independently using its instance ID, which is
/// specified by supplying <see cref="SubOrchestrationOptions" /> in place of <see cref="TaskOptions" />.
/// any child workflows. You must terminate each child workflow independently using its instance ID, which
/// is specified by <see cref="ChildWorkflowTaskOptions.InstanceId" />.
/// </para>
/// </remarks>
/// <param name="workflowName">The name of the workflow to call.</param>
/// <param name="input">The serializable input to pass to the child workflow.</param>
/// <param name="options">
/// Additional options that control the execution and processing of the child workflow. Callers can choose to
/// supply the derived type <see cref="SubOrchestrationOptions" />.
/// Additional options that control the execution and processing of the child workflow.
/// </param>
/// <returns>A task that completes when the child workflow completes or fails.</returns>
/// <exception cref="ArgumentException">The specified workflow does not exist.</exception>
Expand All @@ -256,7 +258,10 @@ public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationTo
/// The child workflow failed with an unhandled exception. The details of the failure can be found in the
/// <see cref="WorkflowTaskFailedException.FailureDetails"/> property.
/// </exception>
public virtual Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null)
public virtual Task CallChildWorkflowAsync(
string workflowName,
object? input = null,
ChildWorkflowTaskOptions? options = null)
{
return this.CallChildWorkflowAsync<object>(workflowName, input, options);
}
Expand Down
Loading

0 comments on commit e59c856

Please sign in to comment.