Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions examples/Workflow/WorkflowWebApp/Activities/NotifyActivity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace WorkflowWebApp.Activities
{
using System.Threading.Tasks;
using Dapr.Workflow;

record Notification(string Message);

class NotifyActivity : WorkflowActivity<Notification, object>
{
readonly ILogger logger;

public NotifyActivity(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<NotifyActivity>();
}

public override Task<object> RunAsync(WorkflowActivityContext context, Notification notification)
{
this.logger.LogInformation(notification.Message);

return Task.FromResult<object>(null);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
namespace WorkflowWebApp.Activities
{
using System.Threading.Tasks;
using Dapr.Workflow;

record PaymentRequest(string RequestId, double Amount, string Currency);

class ProcessPaymentActivity : WorkflowActivity<PaymentRequest, object>
{
readonly ILogger logger;

public ProcessPaymentActivity(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<ProcessPaymentActivity>();
}

public override Task<object> RunAsync(WorkflowActivityContext context, PaymentRequest req)
{
this.logger.LogInformation(
"Processing payment: {requestId}, {amount}, {currency}",
req.RequestId,
req.Amount,
req.Currency);

return Task.FromResult<object>(null);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace WorkflowWebApp.Activities
{
using System.Threading.Tasks;
using Dapr.Workflow;

record InventoryRequest(string RequestId, string Name, int Quantity);
record InventoryResult(bool Success);

class ReserveInventoryActivity : WorkflowActivity<InventoryRequest, InventoryResult>
{
readonly ILogger logger;

public ReserveInventoryActivity(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<ReserveInventoryActivity>();
}

public override Task<InventoryResult> RunAsync(WorkflowActivityContext context, InventoryRequest req)
{
this.logger.LogInformation(
"Reserving inventory: {requestId}, {name}, {quantity}",
req.RequestId,
req.Name,
req.Quantity);

return Task.FromResult(new InventoryResult(true));
}
}
}
81 changes: 56 additions & 25 deletions examples/Workflow/WorkflowWebApp/Program.cs
Original file line number Diff line number Diff line change
@@ -1,51 +1,82 @@
using Dapr.Workflow;
using System.Text.Json.Serialization;
using Dapr.Workflow;
using Microsoft.AspNetCore.Mvc;
using WorkflowWebApp.Activities;
using WorkflowWebApp.Workflows;
using JsonOptions = Microsoft.AspNetCore.Http.Json.JsonOptions;

// The workflow host is a background service that connects to the sidecar over gRPC
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);

// Configure HTTP JSON options.
builder.Services.Configure<JsonOptions>(options =>
{
options.SerializerOptions.Converters.Add(new JsonStringEnumConverter());
options.SerializerOptions.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
});

// Dapr workflows are registered as part of the service configuration
builder.Services.AddDaprWorkflow(options =>
{
// Example of registering a "PlaceOrder" workflow function
options.RegisterWorkflow<string, string>("PlaceOrder", implementation: async (context, input) =>
{
// In real life there are other steps related to placing an order, like reserving
// inventory and charging the customer credit card etc. But let's keep it simple ;)
return await context.CallActivityAsync<string>("ShipProduct", "Coffee Beans");
});
// Note that it's also possible to register a lambda function as the workflow
// or activity implementation instead of a class.
options.RegisterWorkflow<OrderProcessingWorkflow>();
Comment thread
cgillum marked this conversation as resolved.

// Example of registering a "ShipProduct" workflow activity function
options.RegisterActivity<string, string>("ShipProduct", implementation: (context, input) =>
{
return Task.FromResult($"We are shipping {input} to the customer using our hoard of drones!");
});
// These are the activities that get invoked by the workflow(s).
options.RegisterActivity<NotifyActivity>();
options.RegisterActivity<ReserveInventoryActivity>();
options.RegisterActivity<ProcessPaymentActivity>();
});

WebApplication app = builder.Build();

// POST starts new workflow instances
app.MapPost("/order", async (HttpContext context, WorkflowClient client) =>
// POST starts new order workflow instance
app.MapPost("/orders", async (WorkflowEngineClient client, [FromBody] OrderPayload orderInfo) =>
{
string id = Guid.NewGuid().ToString()[..8];
await client.ScheduleNewWorkflowAsync("PlaceOrder", id);
if (orderInfo?.Name == null)
{
return Results.BadRequest(new
{
message = "Order data was missing from the request",
example = new OrderPayload("Paperclips", 99.95),
});
}

// Randomly generated order ID that is 8 characters long.
string orderId = Guid.NewGuid().ToString()[..8];
await client.ScheduleNewWorkflowAsync(nameof(OrderProcessingWorkflow), orderId, orderInfo);

// return an HTTP 202 and a Location header to be used for status query
return Results.AcceptedAtRoute("GetOrderEndpoint", new { id });
return Results.AcceptedAtRoute("GetOrderInfoEndpoint", new { orderId });
});

// GET fetches metadata for specific order workflow instances
app.MapGet("/order/{id}", async (string id, WorkflowClient client) =>
// GET fetches state for order workflow to report status
app.MapGet("/orders/{orderId}", async (string orderId, WorkflowEngineClient client) =>
{
WorkflowMetadata metadata = await client.GetWorkflowMetadataAsync(id, getInputsAndOutputs: true);
if (metadata.Exists)
WorkflowState state = await client.GetWorkflowStateAsync(orderId, true);
if (!state.Exists)
{
return Results.NotFound($"No order with ID = '{orderId}' was found.");
}

var httpResponsePayload = new
{
details = state.ReadInputAs<OrderPayload>(),
status = state.RuntimeStatus.ToString(),
result = state.ReadOutputAs<OrderResult>(),
};

if (state.IsWorkflowRunning)
{
return Results.Ok(metadata);
// HTTP 202 Accepted - async polling clients should keep polling for status
return Results.AcceptedAtRoute("GetOrderInfoEndpoint", new { orderId }, httpResponsePayload);
}
else
{
return Results.NotFound($"No workflow created for order with ID = '{id}' was found.");
// HTTP 200 OK
return Results.Ok(httpResponsePayload);
}
}).WithName("GetOrderEndpoint");
}).WithName("GetOrderInfoEndpoint");
Comment thread
cgillum marked this conversation as resolved.

app.Run();

Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "https://localhost:10080"
"applicationUrl": "http://localhost:10080"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
namespace WorkflowWebApp.Workflows
{
using System.Threading.Tasks;
using Dapr.Workflow;
using WorkflowWebApp.Activities;

record OrderPayload(string Name, double TotalCost, int Quantity = 1);
record OrderResult(bool Processed);

class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
string orderId = context.InstanceId;

await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Received order {orderId} for {order.Name} at {order.TotalCost:c}"));
Comment thread
cgillum marked this conversation as resolved.

string requestId = context.InstanceId;

InventoryResult result = await context.CallActivityAsync<InventoryResult>(
nameof(ReserveInventoryActivity),
new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
if (!result.Success)
{
// End the workflow here since we don't have sufficient inventory
context.SetCustomStatus($"Insufficient inventory for {order.Name}");
return new OrderResult(Processed: false);
}

await context.CallActivityAsync(
nameof(ProcessPaymentActivity),
new PaymentRequest(RequestId: orderId, order.TotalCost, "USD"));

await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} processed successfully!"));

// End the workflow with a success result
return new OrderResult(Processed: true);
}
}
}
137 changes: 137 additions & 0 deletions src/Dapr.Workflow/Workflow.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Workflow
{
using System;
using System.Threading.Tasks;

/// <summary>
/// Common interface for workflow implementations.
/// </summary>
/// <remarks>
/// Users should not implement workflows using this interface, directly.
/// Instead, <see cref="Workflow{TInput, TOutput}"/> should be used to implement workflows.
/// </remarks>
public interface IWorkflow

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't want users to use this, why not make it internal or just provide the abstract class?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't make it internal because there are public APIs that depend on it (like the RegisterWorkflow<T>() API). I believe there was some reason I needed this because of constraints related to how generics work in .NET, though I can't remember exactly what it was...

{
/// <summary>
/// Gets the type of the input parameter that this workflow accepts.
/// </summary>
Type InputType { get; }

/// <summary>
/// Gets the type of the return value that this workflow produces.
/// </summary>
Type OutputType { get; }

/// <summary>
/// Invokes the workflow with the specified context and input.
/// </summary>
/// <param name="context">The workflow's context.</param>
/// <param name="input">The workflow's input.</param>
/// <returns>Returns the workflow output as the result of a <see cref="Task"/>.</returns>
Task<object?> RunAsync(WorkflowContext context, object? input);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own edification, the reason we're doing it this way is so that when the customer extends the abstract class, they can still specify the type variables, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main reason we're doing this is so that we can have common code that can run the user's code without needing to worry about the specific type parameters. I don't recall exactly, but I think there wasn't a way for me to write this common code when generics were involved. It was a problem I originally encountered in the design of the Durable Task SDK that we depend on.

}

/// <summary>
/// Represents the base class for workflows.
/// </summary>
/// <remarks>
/// <para>
/// Workflows describe how actions are executed and the order in which actions are executed. Workflows
/// don't call into external services or do complex computation directly. Rather, they delegate these tasks to
/// <em>activities</em>, which perform the actual work.
/// </para>
/// <para>
/// Workflows can be scheduled using the Dapr client or by other workflows as child-workflows using the
/// <see cref="WorkflowContext.CallChildWorkflowAsync"/> method.
/// </para>
/// <para>
/// Workflows may be replayed multiple times to rebuild their local state after being reloaded into memory.
/// workflow code must therefore be <em>deterministic</em> to ensure no unexpected side effects from execution

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I remember a comment about workflows must be thought as idempotent -- while the activities themselves don't need to be. Not sure if this idempotency is a concept we want to highlight here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, workflows don't need to be idempotent because they don't have any external side effects, which essentially guarantees effectively-once execution guarantees. The only restriction therefore is that workflows are deterministic.

Activities don't need to be deterministic, but it is a good practice to make them idempotent since they have at-least-once execution guarantees. I believe we discuss this in the comments for the WorkflowActivity class.

/// replay. To account for this behavior, there are several coding constraints to be aware of:
/// <list type="bullet">
/// <item>
/// A workflow must not generate random numbers or random GUIDs, get the current date, read environment
/// variables, or do anything else that might result in a different value if the code is replayed in the future.
/// Activities and built-in properties and methods on the <see cref="WorkflowContext"/> parameter, like
/// <see cref="WorkflowContext.CurrentUtcDateTime"/> and <see cref="WorkflowContext.NewGuid"/>,
/// can be used to work around these restrictions.
/// </item>
/// <item>
/// Workflow logic must be executed on the workflow thread (the thread that calls <see cref="RunAsync"/>.
/// Creating new threads, scheduling callbacks on worker pool threads, or awaiting non-workflow tasks is forbidden
/// and may result in failures or other unexpected behavior. Blocking the workflow thread may also result in unexpected
/// performance degredation. The use of <c>await</c> should be restricted to workflow tasks - i.e. tasks returned from
/// methods on the <see cref="WorkflowContext"/> parameter object or tasks that wrap these workflow tasks, like
/// <see cref="Task.WhenAll(Task[])"/> and <see cref="Task.WhenAny(Task[])"/>.
/// </item>
/// <item>
/// Avoid infinite loops as they could cause the application to run out of memory. Instead, ensure that loops are
/// bounded or use <see cref="WorkflowContext.ContinueAsNew"/> to restart the workflow with a new input.
/// </item>
/// <item>
/// Avoid logging normally in the workflow code because log messages will be duplicated on each replay.
/// Instead, write log statements when <see cref="WorkflowContext.IsReplaying"/> is <c>false</c>.
/// </item>
/// </list>
/// </para>
/// <para>
/// Workflow code is tightly coupled with its execution history so special care must be taken when making changes
/// to workflow code. For example, adding or removing activity tasks to a workflow's code may cause a
/// mismatch between code and history for in-flight workflows. To avoid potential issues related to workflow
/// versioning, consider applying the following code update strategies:
Comment on lines +91 to +94

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

/// <list type="bullet">
/// <item>
/// Deploy multiple versions of applications side-by-side allowing new code to run independently of old code.
/// </item>
/// <item>
/// Rather than changing existing workflows, create new workflows that implement the modified behavior.
/// </item>
/// <item>
/// Ensure all in-flight workflows are complete before applying code changes to existing workflow code.
/// </item>
/// <item>
/// If possible, only make changes to workflow code that won't impact its history or execution path. For
/// example, renaming variables or adding log statements have no impact on a workflow's execution path and
/// are safe to apply to existing workflows.
/// </item>
/// </list>
/// </para>
/// </remarks>
/// <typeparam name="TInput">The type of the input parameter that this workflow accepts. This type must be JSON-serializable.</typeparam>
/// <typeparam name="TOutput">The type of the return value that this workflow produces. This type must be JSON-serializable.</typeparam>
Comment thread
cgillum marked this conversation as resolved.
public abstract class Workflow<TInput, TOutput> : IWorkflow
{
/// <inheritdoc/>
Type IWorkflow.InputType => typeof(TInput);

/// <inheritdoc/>
Type IWorkflow.OutputType => typeof(TOutput);

/// <inheritdoc/>
async Task<object?> IWorkflow.RunAsync(WorkflowContext context, object? input)
{
return await this.RunAsync(context, (TInput)input!);
}

/// <summary>
/// Override to implement workflow logic.
/// </summary>
/// <param name="context">The workflow context.</param>
/// <param name="input">The deserialized workflow input.</param>
/// <returns>The output of the workflow as a task.</returns>
public abstract Task<TOutput> RunAsync(WorkflowContext context, TInput input);
}
}
Loading