diff --git a/examples/README.md b/examples/README.md index 3d73bbc9a..44eb57a85 100644 --- a/examples/README.md +++ b/examples/README.md @@ -6,4 +6,5 @@ This repository contains a samples that highlight the Dapr .NET SDK capabilities |----------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | [1. Client](./Client) | The client example shows how to make Dapr calls to publish events, save state, get state and delete state using a Dapr client. | [2. Actor](./Actor) | Demonstrates creating virtual actors that encapsulate code and state. | -| [3. ASP.NET Core](./AspNetCore) | Demonstrates ASP.NET Core integration with Dapr by creating Controllers and Routes. +| [3. ASP.NET Core](./AspNetCore) | Demonstrates ASP.NET Core integration with Dapr by creating Controllers and Routes. | +| [4. Workflow](./Workflow) | Demonstrates creating durable, long-running Dapr workflows using code. | diff --git a/examples/Workflow/README.md b/examples/Workflow/README.md new file mode 100644 index 000000000..87cc9b033 --- /dev/null +++ b/examples/Workflow/README.md @@ -0,0 +1,115 @@ +# Dapr Workflow with ASP.NET Core sample + +This Dapr workflow example shows how to create a Dapr workflow (`Workflow`) and invoke it using ASP.NET Core web APIs. + +## Prerequisites + +- [.NET 6+](https://dotnet.microsoft.com/download) installed +- [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/) +- [Initialized Dapr environment](https://docs.dapr.io/getting-started/install-dapr-selfhost/) +- [Dapr .NET SDK](https://github.com/dapr/dotnet-sdk/) + +## Projects in sample + +This sample contains a single [WorkflowWebApp](./WorkflowWebApp) ASP.NET Core project. It combines both the workflow implementations and the web APIs for starting and querying workflows instances. + +The main `Program.cs` file contains the main setup of the app, including the registration of the web APIs and the registration of the workflow and workflow activities. The workflow definition is found in `Workflows` directory and the workflow activity definitions are found in the `Activities` directory. + +## Running the example + +To run the workflow web app locally, run this command in the `WorkflowWebApp` directory: + +```sh +dapr run --app-id wfwebapp dotnet run +``` + +The application will listen for HTTP requests at `http://localhost:10080`. + +To start a workflow, use the following command to send an HTTP POST request, which triggers an HTTP API that starts the workflow using the Dapr Workflow client. Two identical `curl` commands are shown, one for Linux/macOS (bash) and the other for Windows (PowerShell). The body of the request is used as the input of the workflow. + +On Linux/macOS (bash): + +```bash +curl -i -X POST http://localhost:10080/orders \ + -H "Content-Type: application/json" \ + -d '{"name": "Paperclips", "totalCost": 99.95, "quantity": 1}' +``` + +On Windows (PowerShell): + +```powershell +curl -i -X POST http://localhost:10080/orders ` + -H "Content-Type: application/json" ` + -d '{"name": "Paperclips", "totalCost": 99.95, "quantity": 1}' +``` + +If successful, you should see a response like the following, which contains a `Location` header pointing to a status endpoint for the workflow that was created with a randomly generated 8-digit ID: + +```http +HTTP/1.1 202 Accepted +Content-Length: 0 +Date: Tue, 24 Jan 2023 00:02:02 GMT +Server: Kestrel +Location: http://localhost:10080/orders/cdcce425 +``` + +Next, send an HTTP request to the URL in the `Location` header in the previous HTTP response, like in the following example: + +```bash +curl -i http://localhost:10080/orders/cdcce425 +``` + +If the workflow has completed running, you should see the following output (formatted for readability): + +```http +HTTP/1.1 200 OK +Content-Type: application/json; charset=utf-8 +Date: Tue, 24 Jan 2023 00:10:53 GMT +Server: Kestrel +Transfer-Encoding: chunked + +{ + "details": { + "name": "Paperclips", + "quantity": 1, + "totalCost": 99.95 + }, + "result": { + "processed": true + }, + "status": "Completed" +} +``` + +If the workflow hasn't completed yet, you might instead see the following: + +```http +HTTP/1.1 202 Accepted +Content-Type: application/json; charset=utf-8 +Date: Tue, 24 Jan 2023 00:17:49 GMT +Location: http://localhost:10080/orders/cdcce425 +Server: Kestrel +Transfer-Encoding: chunked + +{ + "details": { + "name": "Paperclips", + "quantity": 1, + "totalCost": 99.95 + }, + "status": "Running" +} +``` + +When the workflow has completed, the stdout of the web app should look like the following: + +```log +info: WorkflowWebApp.Activities.NotifyActivity[0] + Received order cdcce425 for Paperclips at $99.95 +info: WorkflowWebApp.Activities.ReserveInventoryActivity[0] + Reserving inventory: cdcce425, Paperclips, 1 +info: WorkflowWebApp.Activities.ProcessPaymentActivity[0] + Processing payment: cdcce425, 99.95, USD +info: WorkflowWebApp.Activities.NotifyActivity[0] + Order cdcce425 processed successfully! +``` diff --git a/examples/Workflow/WorkflowWebApp/Activities/NotifyActivity.cs b/examples/Workflow/WorkflowWebApp/Activities/NotifyActivity.cs new file mode 100644 index 000000000..44846c20e --- /dev/null +++ b/examples/Workflow/WorkflowWebApp/Activities/NotifyActivity.cs @@ -0,0 +1,24 @@ +namespace WorkflowWebApp.Activities +{ + using System.Threading.Tasks; + using Dapr.Workflow; + + record Notification(string Message); + + class NotifyActivity : WorkflowActivity + { + readonly ILogger logger; + + public NotifyActivity(ILoggerFactory loggerFactory) + { + this.logger = loggerFactory.CreateLogger(); + } + + public override Task RunAsync(WorkflowActivityContext context, Notification notification) + { + this.logger.LogInformation(notification.Message); + + return Task.FromResult(null); + } + } +} diff --git a/examples/Workflow/WorkflowWebApp/Activities/ProcessPaymentActivity.cs b/examples/Workflow/WorkflowWebApp/Activities/ProcessPaymentActivity.cs new file mode 100644 index 000000000..16075e87f --- /dev/null +++ b/examples/Workflow/WorkflowWebApp/Activities/ProcessPaymentActivity.cs @@ -0,0 +1,35 @@ +namespace WorkflowWebApp.Activities +{ + using System.Threading.Tasks; + using Dapr.Workflow; + + record PaymentRequest(string RequestId, double Amount, string Currency); + + class ProcessPaymentActivity : WorkflowActivity + { + readonly ILogger logger; + + public ProcessPaymentActivity(ILoggerFactory loggerFactory) + { + this.logger = loggerFactory.CreateLogger(); + } + + public override async Task RunAsync(WorkflowActivityContext context, PaymentRequest req) + { + this.logger.LogInformation( + "Processing payment: {requestId}, {amount}, {currency}", + req.RequestId, + req.Amount, + req.Currency); + + // Simulate slow processing + await Task.Delay(TimeSpan.FromSeconds(7)); + + this.logger.LogInformation( + "Payment for request ID '{requestId}' processed successfully", + req.RequestId); + + return null; + } + } +} diff --git a/examples/Workflow/WorkflowWebApp/Activities/ReserveInventoryActivity.cs b/examples/Workflow/WorkflowWebApp/Activities/ReserveInventoryActivity.cs new file mode 100644 index 000000000..19f57a051 --- /dev/null +++ b/examples/Workflow/WorkflowWebApp/Activities/ReserveInventoryActivity.cs @@ -0,0 +1,32 @@ +namespace WorkflowWebApp.Activities +{ + using System.Threading.Tasks; + using Dapr.Workflow; + + record InventoryRequest(string RequestId, string Name, int Quantity); + record InventoryResult(bool Success); + + class ReserveInventoryActivity : WorkflowActivity + { + readonly ILogger logger; + + public ReserveInventoryActivity(ILoggerFactory loggerFactory) + { + this.logger = loggerFactory.CreateLogger(); + } + + public override async Task RunAsync(WorkflowActivityContext context, InventoryRequest req) + { + this.logger.LogInformation( + "Reserving inventory: {requestId}, {name}, {quantity}", + req.RequestId, + req.Name, + req.Quantity); + + // Simulate slow processing + await Task.Delay(TimeSpan.FromSeconds(2)); + + return new InventoryResult(true); + } + } +} diff --git a/examples/Workflow/WorkflowWebApp/Program.cs b/examples/Workflow/WorkflowWebApp/Program.cs index 838141baf..119bc2af4 100644 --- a/examples/Workflow/WorkflowWebApp/Program.cs +++ b/examples/Workflow/WorkflowWebApp/Program.cs @@ -1,51 +1,82 @@ -using Dapr.Workflow; +using System.Text.Json.Serialization; +using Dapr.Workflow; +using Microsoft.AspNetCore.Mvc; +using WorkflowWebApp.Activities; +using WorkflowWebApp.Workflows; +using JsonOptions = Microsoft.AspNetCore.Http.Json.JsonOptions; // The workflow host is a background service that connects to the sidecar over gRPC WebApplicationBuilder builder = WebApplication.CreateBuilder(args); +// Configure HTTP JSON options. +builder.Services.Configure(options => +{ + options.SerializerOptions.Converters.Add(new JsonStringEnumConverter()); + options.SerializerOptions.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull; +}); + // Dapr workflows are registered as part of the service configuration builder.Services.AddDaprWorkflow(options => { - // Example of registering a "PlaceOrder" workflow function - options.RegisterWorkflow("PlaceOrder", implementation: async (context, input) => - { - // In real life there are other steps related to placing an order, like reserving - // inventory and charging the customer credit card etc. But let's keep it simple ;) - return await context.CallActivityAsync("ShipProduct", "Coffee Beans"); - }); + // Note that it's also possible to register a lambda function as the workflow + // or activity implementation instead of a class. + options.RegisterWorkflow(); - // Example of registering a "ShipProduct" workflow activity function - options.RegisterActivity("ShipProduct", implementation: (context, input) => - { - return Task.FromResult($"We are shipping {input} to the customer using our hoard of drones!"); - }); + // These are the activities that get invoked by the workflow(s). + options.RegisterActivity(); + options.RegisterActivity(); + options.RegisterActivity(); }); WebApplication app = builder.Build(); -// POST starts new workflow instances -app.MapPost("/order", async (HttpContext context, WorkflowClient client) => +// POST starts new order workflow instance +app.MapPost("/orders", async (WorkflowEngineClient client, [FromBody] OrderPayload orderInfo) => { - string id = Guid.NewGuid().ToString()[..8]; - await client.ScheduleNewWorkflowAsync("PlaceOrder", id); + if (orderInfo?.Name == null) + { + return Results.BadRequest(new + { + message = "Order data was missing from the request", + example = new OrderPayload("Paperclips", 99.95), + }); + } + + // Randomly generated order ID that is 8 characters long. + string orderId = Guid.NewGuid().ToString()[..8]; + await client.ScheduleNewWorkflowAsync(nameof(OrderProcessingWorkflow), orderId, orderInfo); // return an HTTP 202 and a Location header to be used for status query - return Results.AcceptedAtRoute("GetOrderEndpoint", new { id }); + return Results.AcceptedAtRoute("GetOrderInfoEndpoint", new { orderId }); }); -// GET fetches metadata for specific order workflow instances -app.MapGet("/order/{id}", async (string id, WorkflowClient client) => +// GET fetches state for order workflow to report status +app.MapGet("/orders/{orderId}", async (string orderId, WorkflowEngineClient client) => { - WorkflowMetadata metadata = await client.GetWorkflowMetadataAsync(id, getInputsAndOutputs: true); - if (metadata.Exists) + WorkflowState state = await client.GetWorkflowStateAsync(orderId, true); + if (!state.Exists) + { + return Results.NotFound($"No order with ID = '{orderId}' was found."); + } + + var httpResponsePayload = new + { + details = state.ReadInputAs(), + status = state.RuntimeStatus.ToString(), + result = state.ReadOutputAs(), + }; + + if (state.IsWorkflowRunning) { - return Results.Ok(metadata); + // HTTP 202 Accepted - async polling clients should keep polling for status + return Results.AcceptedAtRoute("GetOrderInfoEndpoint", new { orderId }, httpResponsePayload); } else { - return Results.NotFound($"No workflow created for order with ID = '{id}' was found."); + // HTTP 200 OK + return Results.Ok(httpResponsePayload); } -}).WithName("GetOrderEndpoint"); +}).WithName("GetOrderInfoEndpoint"); app.Run(); diff --git a/examples/Workflow/WorkflowWebApp/Properties/launchSettings.json b/examples/Workflow/WorkflowWebApp/Properties/launchSettings.json index 8254ce0fe..daf820a44 100644 --- a/examples/Workflow/WorkflowWebApp/Properties/launchSettings.json +++ b/examples/Workflow/WorkflowWebApp/Properties/launchSettings.json @@ -6,7 +6,7 @@ "environmentVariables": { "ASPNETCORE_ENVIRONMENT": "Development" }, - "applicationUrl": "https://localhost:10080" + "applicationUrl": "http://localhost:10080" } } } \ No newline at end of file diff --git a/examples/Workflow/WorkflowWebApp/WorkflowWebApp.csproj b/examples/Workflow/WorkflowWebApp/WorkflowWebApp.csproj index 28e9dadb9..92a2999d3 100644 --- a/examples/Workflow/WorkflowWebApp/WorkflowWebApp.csproj +++ b/examples/Workflow/WorkflowWebApp/WorkflowWebApp.csproj @@ -1,4 +1,4 @@ - + @@ -6,7 +6,7 @@ Exe - net6 + net6 enable latest diff --git a/examples/Workflow/WorkflowWebApp/Workflows/OrderProcessingWorkflow.cs b/examples/Workflow/WorkflowWebApp/Workflows/OrderProcessingWorkflow.cs new file mode 100644 index 000000000..035f8911a --- /dev/null +++ b/examples/Workflow/WorkflowWebApp/Workflows/OrderProcessingWorkflow.cs @@ -0,0 +1,44 @@ +namespace WorkflowWebApp.Workflows +{ + using System.Threading.Tasks; + using Dapr.Workflow; + using WorkflowWebApp.Activities; + + record OrderPayload(string Name, double TotalCost, int Quantity = 1); + record OrderResult(bool Processed); + + class OrderProcessingWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, OrderPayload order) + { + string orderId = context.InstanceId; + + await context.CallActivityAsync( + nameof(NotifyActivity), + new Notification($"Received order {orderId} for {order.Name} at {order.TotalCost:c}")); + + string requestId = context.InstanceId; + + InventoryResult result = await context.CallActivityAsync( + nameof(ReserveInventoryActivity), + new InventoryRequest(RequestId: orderId, order.Name, order.Quantity)); + if (!result.Success) + { + // End the workflow here since we don't have sufficient inventory + context.SetCustomStatus($"Insufficient inventory for {order.Name}"); + return new OrderResult(Processed: false); + } + + await context.CallActivityAsync( + nameof(ProcessPaymentActivity), + new PaymentRequest(RequestId: orderId, order.TotalCost, "USD")); + + await context.CallActivityAsync( + nameof(NotifyActivity), + new Notification($"Order {orderId} processed successfully!")); + + // End the workflow with a success result + return new OrderResult(Processed: true); + } + } +} diff --git a/examples/Workflow/WorkflowWebApp/demo.http b/examples/Workflow/WorkflowWebApp/demo.http index 00210e840..2fa686385 100644 --- a/examples/Workflow/WorkflowWebApp/demo.http +++ b/examples/Workflow/WorkflowWebApp/demo.http @@ -1,6 +1,8 @@ ### Create new order -POST http://localhost:8080/workflow +POST http://localhost:10080/orders Content-Type: application/json +{"name": "Paperclips", "totalCost": 99.95, "quantity": 1} + ### Query placeholder -GET http://localhost:8080/workflow/XXX \ No newline at end of file +GET http://localhost:10080/orders/XXX \ No newline at end of file diff --git a/src/Dapr.Workflow/Dapr.Workflow.csproj b/src/Dapr.Workflow/Dapr.Workflow.csproj index 28e4e2513..5c7bf8e98 100644 --- a/src/Dapr.Workflow/Dapr.Workflow.csproj +++ b/src/Dapr.Workflow/Dapr.Workflow.csproj @@ -2,7 +2,7 @@ - netcoreapp3.1;net5;net6 + net6 enable Dapr.Workflow Dapr Workflow Authoring SDK diff --git a/src/Dapr.Workflow/Workflow.cs b/src/Dapr.Workflow/Workflow.cs new file mode 100644 index 000000000..6510414bf --- /dev/null +++ b/src/Dapr.Workflow/Workflow.cs @@ -0,0 +1,137 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Workflow +{ + using System; + using System.Threading.Tasks; + + /// + /// Common interface for workflow implementations. + /// + /// + /// Users should not implement workflows using this interface, directly. + /// Instead, should be used to implement workflows. + /// + public interface IWorkflow + { + /// + /// Gets the type of the input parameter that this workflow accepts. + /// + Type InputType { get; } + + /// + /// Gets the type of the return value that this workflow produces. + /// + Type OutputType { get; } + + /// + /// Invokes the workflow with the specified context and input. + /// + /// The workflow's context. + /// The workflow's input. + /// Returns the workflow output as the result of a . + Task RunAsync(WorkflowContext context, object? input); + } + + /// + /// Represents the base class for workflows. + /// + /// + /// + /// Workflows describe how actions are executed and the order in which actions are executed. Workflows + /// don't call into external services or do complex computation directly. Rather, they delegate these tasks to + /// activities, which perform the actual work. + /// + /// + /// Workflows can be scheduled using the Dapr client or by other workflows as child-workflows using the + /// method. + /// + /// + /// Workflows may be replayed multiple times to rebuild their local state after being reloaded into memory. + /// workflow code must therefore be deterministic to ensure no unexpected side effects from execution + /// replay. To account for this behavior, there are several coding constraints to be aware of: + /// + /// + /// A workflow must not generate random numbers or random GUIDs, get the current date, read environment + /// variables, or do anything else that might result in a different value if the code is replayed in the future. + /// Activities and built-in properties and methods on the parameter, like + /// and , + /// can be used to work around these restrictions. + /// + /// + /// Workflow logic must be executed on the workflow thread (the thread that calls . + /// Creating new threads, scheduling callbacks on worker pool threads, or awaiting non-workflow tasks is forbidden + /// and may result in failures or other unexpected behavior. Blocking the workflow thread may also result in unexpected + /// performance degredation. The use of await should be restricted to workflow tasks - i.e. tasks returned from + /// methods on the parameter object or tasks that wrap these workflow tasks, like + /// and . + /// + /// + /// Avoid infinite loops as they could cause the application to run out of memory. Instead, ensure that loops are + /// bounded or use to restart the workflow with a new input. + /// + /// + /// Avoid logging normally in the workflow code because log messages will be duplicated on each replay. + /// Instead, write log statements when is false. + /// + /// + /// + /// + /// Workflow code is tightly coupled with its execution history so special care must be taken when making changes + /// to workflow code. For example, adding or removing activity tasks to a workflow's code may cause a + /// mismatch between code and history for in-flight workflows. To avoid potential issues related to workflow + /// versioning, consider applying the following code update strategies: + /// + /// + /// Deploy multiple versions of applications side-by-side allowing new code to run independently of old code. + /// + /// + /// Rather than changing existing workflows, create new workflows that implement the modified behavior. + /// + /// + /// Ensure all in-flight workflows are complete before applying code changes to existing workflow code. + /// + /// + /// If possible, only make changes to workflow code that won't impact its history or execution path. For + /// example, renaming variables or adding log statements have no impact on a workflow's execution path and + /// are safe to apply to existing workflows. + /// + /// + /// + /// + /// The type of the input parameter that this workflow accepts. This type must be JSON-serializable. + /// The type of the return value that this workflow produces. This type must be JSON-serializable. + public abstract class Workflow : IWorkflow + { + /// + Type IWorkflow.InputType => typeof(TInput); + + /// + Type IWorkflow.OutputType => typeof(TOutput); + + /// + async Task IWorkflow.RunAsync(WorkflowContext context, object? input) + { + return await this.RunAsync(context, (TInput)input!); + } + + /// + /// Override to implement workflow logic. + /// + /// The workflow context. + /// The deserialized workflow input. + /// The output of the workflow as a task. + public abstract Task RunAsync(WorkflowContext context, TInput input); + } +} diff --git a/src/Dapr.Workflow/WorkflowActivity.cs b/src/Dapr.Workflow/WorkflowActivity.cs new file mode 100644 index 000000000..4daffe578 --- /dev/null +++ b/src/Dapr.Workflow/WorkflowActivity.cs @@ -0,0 +1,93 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Workflow +{ + using System; + using System.Threading.Tasks; + + /// + /// Common interface for workflow activity implementations. + /// + /// + /// Users should not implement workflow activities using this interface, directly. + /// Instead, should be used to implement workflow activities. + /// + public interface IWorkflowActivity + { + /// + /// Gets the type of the input parameter that this activity accepts. + /// + Type InputType { get; } + + /// + /// Gets the type of the return value that this activity produces. + /// + Type OutputType { get; } + + /// + /// Invokes the workflow activity with the specified context and input. + /// + /// The workflow activity's context. + /// The workflow activity's input. + /// Returns the workflow activity output as the result of a . + Task RunAsync(WorkflowActivityContext context, object? input); + } + + /// + /// Base class for workflow activities. + /// + /// + /// + /// Workflow activities are the basic unit of work in a workflow. Activities are the tasks that are + /// orchestrated in the business process. For example, you might create a workflow to process an order. The tasks + /// may involve checking the inventory, charging the customer, and creating a shipment. Each task would be a separate + /// activity. These activities may be executed serially, in parallel, or some combination of both. + /// + /// Unlike workflows, activities aren't restricted in the type of work you can do in them. Activities + /// are frequently used to make network calls or run CPU intensive operations. An activity can also return data back to + /// the workflow. The Dapr workflow engine guarantees that each called activity will be executed + /// at least once as part of a workflow's execution. + /// + /// Because activities only guarantee at least once execution, it's recommended that activity logic be implemented as + /// idempotent whenever possible. + /// + /// Activities are invoked by workflows using one of the + /// method overloads. + /// + /// + /// The type of the input parameter that this activity accepts. + /// The type of the return value that this activity produces. + public abstract class WorkflowActivity : IWorkflowActivity + { + /// + Type IWorkflowActivity.InputType => typeof(TInput); + + /// + Type IWorkflowActivity.OutputType => typeof(TOutput); + + /// + async Task IWorkflowActivity.RunAsync(WorkflowActivityContext context, object? input) + { + return await this.RunAsync(context, (TInput)input!); + } + + /// + /// Override to implement async (non-blocking) workflow activity logic. + /// + /// Provides access to additional context for the current activity execution. + /// The deserialized activity input. + /// The output of the activity as a task. + public abstract Task RunAsync(WorkflowActivityContext context, TInput input); + } +} diff --git a/src/Dapr.Workflow/WorkflowActivityContext.cs b/src/Dapr.Workflow/WorkflowActivityContext.cs index 267ba4cd9..eec32f008 100644 --- a/src/Dapr.Workflow/WorkflowActivityContext.cs +++ b/src/Dapr.Workflow/WorkflowActivityContext.cs @@ -14,8 +14,6 @@ namespace Dapr.Workflow { using System; - using System.Threading; - using System.Threading.Tasks; using Microsoft.DurableTask; /// diff --git a/src/Dapr.Workflow/WorkflowContext.cs b/src/Dapr.Workflow/WorkflowContext.cs index 9a9bd8a7e..f3439980c 100644 --- a/src/Dapr.Workflow/WorkflowContext.cs +++ b/src/Dapr.Workflow/WorkflowContext.cs @@ -34,7 +34,7 @@ internal WorkflowContext(TaskOrchestrationContext innerContext) /// /// Gets the name of the current workflow. /// - public TaskName Name => this.innerContext.Name; + public string Name => this.innerContext.Name; /// /// Gets the instance ID of the current workflow. @@ -44,47 +44,299 @@ internal WorkflowContext(TaskOrchestrationContext innerContext) /// /// Gets the current workflow time in UTC. /// + /// + /// The current workflow time is stored in the workflow history and this API will + /// return the same value each time it is called from a particular point in the workflow's + /// execution. It is a deterministic, replay-safe replacement for existing .NET APIs for getting + /// the current time, such as and + /// (which should not be used). + /// public DateTime CurrentUtcDateTime => this.innerContext.CurrentUtcDateTime; /// - /// Assigns a custom status value to the current workflow. + /// Gets a value indicating whether the workflow is currently replaying a previous execution. /// - public void SetCustomStatus(object? customStatus) => this.innerContext.SetCustomStatus(customStatus); + /// + /// + /// Workflow functions are "replayed" after being unloaded from memory to reconstruct local variable state. + /// During a replay, previously executed tasks will be completed automatically with previously seen values + /// that are stored in the workflow history. One the workflow reaches the point in the workflow logic + /// where it's no longer replaying existing history, the property will return false. + /// + /// You can use this property if you have logic that needs to run only when not replaying. For example, + /// certain types of application logging may become too noisy when duplicated as part of replay. The + /// application code could check to see whether the function is being replayed and then issue the log statements + /// when this value is false. + /// + /// + /// + /// true if the workflow is currently replaying a previous execution; otherwise false. + /// + public bool IsReplaying => this.innerContext.IsReplaying; + + /// + /// Asynchronously invokes an activity by name and with the specified input value. + /// + /// + /// + /// Activities are the basic unit of work in a workflow. Unlike workflows, which are not + /// allowed to do any I/O or call non-deterministic APIs, activities have no implementation restrictions. + /// + /// An activity may execute in the local machine or a remote machine. The exact behavior depends on the underlying + /// workflow engine, which is responsible for distributing tasks across machines. In general, you should never make + /// any assumptions about where an activity will run. You should also assume at-least-once execution guarantees for + /// activities, meaning that an activity may be executed twice if, for example, there is a process failure before + /// the activities result is saved into storage. + /// + /// Both the inputs and outputs of activities are serialized and stored in durable storage. It's highly recommended + /// to not include any sensitive data in activity inputs or outputs. It's also recommended to not use large payloads + /// for activity inputs and outputs, which can result in expensive serialization and network utilization. For data + /// that cannot be cheaply or safely persisted to storage, it's recommended to instead pass references + /// (for example, a URL to a storage blob/bucket) to the data and have activities fetch the data directly as part of their + /// implementation. + /// + /// + /// The name of the activity to call. + /// The serializable input to pass to the activity. + /// Additional options that control the execution and processing of the activity. + /// A task that completes when the activity completes or fails. + /// The specified activity does not exist. + /// + /// Thrown if the calling thread is not the workflow dispatch thread. + /// + /// + /// The activity failed with an unhandled exception. The details of the failure can be found in the + /// property. + /// + public Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null) + { + return this.innerContext.CallActivityAsync(name, input, options); + } + + /// + /// A task that completes when the activity completes or fails. The result of the task is the activity's return value. + /// + /// + public Task CallActivityAsync(string name, object? input = null, TaskOptions? options = null) + { + return this.innerContext.CallActivityAsync(name, input, options); + } /// /// Creates a durable timer that expires after the specified delay. /// - /// + /// The amount of time before the timer should expire. + /// Used to cancel the durable timer. + /// A task that completes when the durable timer expires. + /// + /// Thrown if the calling thread is not the workflow dispatch thread. + /// public Task CreateTimer(TimeSpan delay, CancellationToken cancellationToken = default) { return this.innerContext.CreateTimer(delay, cancellationToken); } /// - /// Waits for an event to be raised with name and returns the event data. + /// Creates a durable timer that expires at a set date and time. /// - /// - public Task WaitForExternalEventAsync(string eventName, TimeSpan timeout) + /// The time at which the timer should expire. + /// Used to cancel the durable timer. + /// + public Task CreateTimer(DateTime fireAt, CancellationToken cancellationToken) { - return this.innerContext.WaitForExternalEvent(eventName, timeout); + return this.innerContext.CreateTimer(fireAt, cancellationToken); } /// /// Waits for an event to be raised with name and returns the event data. /// - /// + /// + /// + /// External clients can raise events to a waiting workflow instance. Similarly, workflows can raise + /// events to other workflows using the method. + /// + /// If the current workflow instance is not yet waiting for an event named , + /// then the event will be saved in the workflow instance state and dispatched immediately when this method is + /// called. This event saving occurs even if the current workflow cancels the wait operation before the event is + /// received. + /// + /// Workflows can wait for the same event name multiple times, so waiting for multiple events with the same name + /// is allowed. Each external event received by a workflow will complete just one task returned by this method. + /// + /// + /// + /// The name of the event to wait for. Event names are case-insensitive. External event names can be reused any + /// number of times; they are not required to be unique. + /// + /// A CancellationToken to use to abort waiting for the event. + /// Any serializable type that represents the event payload. + /// + /// A task that completes when the external event is received. The value of the task is the deserialized event payload. + /// + /// + /// Thrown if the calling thread is not the workflow dispatch thread. + /// public Task WaitForExternalEventAsync(string eventName, CancellationToken cancellationToken = default) { return this.innerContext.WaitForExternalEvent(eventName, cancellationToken); } /// - /// Asynchronously invokes an activity by name and with the specified input value. + /// Waits for an event to be raised with name and returns the event data. /// - /// - public Task CallActivityAsync(TaskName name, object? input = null, TaskOptions? options = null) + /// + /// The name of the event to wait for. Event names are case-insensitive. External event names can be reused any + /// number of times; they are not required to be unique. + /// + /// The amount of time to wait before cancelling the external event task. + /// + public Task WaitForExternalEventAsync(string eventName, TimeSpan timeout) { - return this.innerContext.CallActivityAsync(name, input, options); + return this.innerContext.WaitForExternalEvent(eventName, timeout); + } + + /// + /// Raises an external event for the specified workflow instance. + /// + /// + /// The target workflow can handle the sent event using the + /// method. + /// + /// If the target workflow doesn't exist, the event will be silently dropped. + /// + /// + /// The ID of the workflow instance to send the event to. + /// The name of the event to wait for. Event names are case-insensitive. + /// The serializable payload of the external event. + public void SendEvent(string instanceId, string eventName, object payload) + { + this.SendEvent(instanceId, eventName, payload); + } + + /// + /// Assigns a custom status value to the current workflow. + /// + /// + /// The value is serialized and stored in workflow state and will + /// be made available to the workflow status query APIs. + /// + /// + /// A serializable value to assign as the custom status value or null to clear the custom status. + /// + /// + /// Thrown if the calling thread is not the workflow dispatch thread. + /// + public void SetCustomStatus(object? customStatus) + { + this.innerContext.SetCustomStatus(customStatus); + } + + /// + /// Executes the specified workflow as a child workflow and returns the result. + /// + /// + /// The type into which to deserialize the child workflow's output. + /// + /// + public Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null) + { + return this.innerContext.CallSubOrchestratorAsync(workflowName, input, options); + } + + /// + /// Executes the specified workflow as a child workflow. + /// + /// + /// + /// In addition to activities, workflows can schedule other workflows as child workflows. + /// A child workflow has its own instance ID, history, and status that is independent of the parent workflow + /// that started it. + /// + /// Child workflows have many benefits: + /// + /// You can split large workflows into a series of smaller child workflows, making your code more maintainable. + /// You can distribute workflow logic across multiple compute nodes concurrently, which is useful if + /// your workflow logic otherwise needs to coordinate a lot of tasks. + /// You can reduce memory usage and CPU overhead by keeping the history of parent workflow smaller. + /// + /// + /// The return value of a child workflow is its output. If a child workflow fails with an exception, then that + /// exception will be surfaced to the parent workflow, just like it is when an activity task fails with an + /// exception. Child workflows also support automatic retry policies. + /// + /// Because child workflows are independent of their parents, terminating a parent workflow does not affect + /// any child workflows. You must terminate each child workflow independently using its instance ID, which is + /// specified by supplying in place of . + /// + /// + /// The name of the workflow to call. + /// The serializable input to pass to the child workflow. + /// + /// Additional options that control the execution and processing of the child workflow. Callers can choose to + /// supply the derived type . + /// + /// A task that completes when the child workflow completes or fails. + /// The specified workflow does not exist. + /// + /// Thrown if the calling thread is not the workflow dispatch thread. + /// + /// + /// The child workflow failed with an unhandled exception. The details of the failure can be found in the + /// property. + /// + public Task CallChildWorkflowAsync(string workflowName, object? input = null, TaskOptions? options = null) + { + return this.innerContext.CallSubOrchestratorAsync(workflowName, input, options); + } + + /// + /// Restarts the workflow with a new input and clears its history. + /// + /// + /// + /// This method is primarily designed for eternal workflows, which are workflows that + /// may not ever complete. It works by restarting the workflow, providing it with a new input, + /// and truncating the existing workflow history. It allows the workflow to continue + /// running indefinitely without having its history grow unbounded. The benefits of periodically + /// truncating history include decreased memory usage, decreased storage volumes, and shorter workflow + /// replays when rebuilding state. + /// + /// The results of any incomplete tasks will be discarded when a workflow calls + /// . For example, if a timer is scheduled and then + /// is called before the timer fires, the timer event will be discarded. The only exception to this + /// is external events. By default, if an external event is received by an workflow but not yet + /// processed, the event is saved in the workflow state unit it is received by a call to + /// . These events will continue to remain in memory + /// even after an workflow restarts using . You can disable this behavior and + /// remove any saved external events by specifying false for the + /// parameter value. + /// + /// Workflow implementations should complete immediately after calling the method. + /// + /// + /// The JSON-serializable input data to re-initialize the instance with. + /// + /// If set to true, re-adds any unprocessed external events into the new execution + /// history when the workflow instance restarts. If false, any unprocessed + /// external events will be discarded when the workflow instance restarts. + /// + public void ContinueAsNew(object? newInput = null, bool preserveUnprocessedEvents = true) + { + this.innerContext.ContinueAsNew(newInput!, preserveUnprocessedEvents); + } + + /// + /// Creates a new GUID that is safe for replay within a workflow. + /// + /// + /// The default implementation of this method creates a name-based UUID V5 using the algorithm from RFC 4122 §4.3. + /// The name input used to generate this value is a combination of the workflow instance ID, the current time, + /// and an internally managed sequence number. + /// + /// The new value. + public Guid NewGuid() + { + return this.innerContext.NewGuid(); } } } diff --git a/src/Dapr.Workflow/WorkflowClient.cs b/src/Dapr.Workflow/WorkflowEngineClient.cs similarity index 70% rename from src/Dapr.Workflow/WorkflowClient.cs rename to src/Dapr.Workflow/WorkflowEngineClient.cs index 597768fa4..640bacd30 100644 --- a/src/Dapr.Workflow/WorkflowClient.cs +++ b/src/Dapr.Workflow/WorkflowEngineClient.cs @@ -1,5 +1,5 @@ // ------------------------------------------------------------------------ -// Copyright 2022 The Dapr Authors +// Copyright 2023 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -11,27 +11,27 @@ // limitations under the License. // ------------------------------------------------------------------------ -using System; -using System.Threading.Tasks; -using Microsoft.DurableTask; -using Microsoft.DurableTask.Client; - namespace Dapr.Workflow { + using System; + using System.Threading.Tasks; + using Microsoft.DurableTask; + using Microsoft.DurableTask.Client; + // TODO: This will be replaced by the official Dapr Workflow management client. /// /// Defines client operations for managing Dapr Workflow instances. /// - public sealed class WorkflowClient : IAsyncDisposable + public sealed class WorkflowEngineClient : IAsyncDisposable { readonly DurableTaskClient innerClient; /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// The Durable Task client used to communicate with the Dapr sidecar. /// Thrown if is null. - public WorkflowClient(DurableTaskClient innerClient) + public WorkflowEngineClient(DurableTaskClient innerClient) { this.innerClient = innerClient ?? throw new ArgumentNullException(nameof(innerClient)); } @@ -41,14 +41,14 @@ public WorkflowClient(DurableTaskClient innerClient) /// /// The name of the orchestrator to schedule. /// - /// The unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. + /// The unique ID of the workflow instance to schedule. If not specified, a new GUID value is used. /// /// - /// The time when the orchestration instance should start executing. If not specified or if a date-time in the past - /// is specified, the orchestration instance will be scheduled immediately. + /// The time when the workflow instance should start executing. If not specified or if a date-time in the past + /// is specified, the workflow instance will be scheduled immediately. /// /// - /// The optional input to pass to the scheduled orchestration instance. This must be a serializable value. + /// The optional input to pass to the scheduled workflow instance. This must be a serializable value. /// public Task ScheduleNewWorkflowAsync( string name, @@ -61,19 +61,19 @@ public Task ScheduleNewWorkflowAsync( } /// - /// Fetches runtime metadata for the specified workflow instance. + /// Fetches runtime state for the specified workflow instance. /// - /// The unique ID of the orchestration instance to fetch. + /// The unique ID of the workflow instance to fetch. /// - /// Specify true to fetch the orchestration instance's inputs, outputs, and custom status, or false to + /// Specify true to fetch the workflow instance's inputs, outputs, and custom status, or false to /// omit them. Defaults to false. /// - public async Task GetWorkflowMetadataAsync(string instanceId, bool getInputsAndOutputs = false) + public async Task GetWorkflowStateAsync(string instanceId, bool getInputsAndOutputs = false) { OrchestrationMetadata? metadata = await this.innerClient.GetInstanceMetadataAsync( instanceId, getInputsAndOutputs); - return new WorkflowMetadata(metadata); + return new WorkflowState(metadata); } /// diff --git a/src/Dapr.Workflow/WorkflowRuntimeOptions.cs b/src/Dapr.Workflow/WorkflowRuntimeOptions.cs index 0bc602311..f4f6691f3 100644 --- a/src/Dapr.Workflow/WorkflowRuntimeOptions.cs +++ b/src/Dapr.Workflow/WorkflowRuntimeOptions.cs @@ -17,6 +17,8 @@ namespace Dapr.Workflow using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.DurableTask; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.Logging; /// /// Defines runtime options for workflows. @@ -28,6 +30,16 @@ public sealed class WorkflowRuntimeOptions /// readonly Dictionary> factories = new(); + /// + /// Initializes a new instance of the class. + /// + /// + /// Instances of this type are expected to be instanciated from a dependency injection container. + /// + public WorkflowRuntimeOptions() + { + } + /// /// Registers a workflow as a function that takes a specified input type and returns a specified output type. /// @@ -46,6 +58,25 @@ public void RegisterWorkflow(string name, Func + /// Registers a workflow class that derives from . + /// + /// The type to register. + public void RegisterWorkflow() where TWorkflow : class, IWorkflow, new() + { + string name = typeof(TWorkflow).Name; + + // Dapr workflows are implemented as specialized Durable Task orchestrations + this.factories.Add(name, (DurableTaskRegistry registry) => + { + registry.AddOrchestrator(name, () => + { + TWorkflow workflow = Activator.CreateInstance(); + return new OrchestratorWrapper(workflow); + }); + }); + } + /// /// Registers a workflow activity as a function that takes a specified input type and returns a specified output type. /// @@ -64,6 +95,26 @@ public void RegisterActivity(string name, Func + /// Registers a workflow activity class that derives from . + /// + /// The type to register. + public void RegisterActivity() where TActivity : class, IWorkflowActivity + { + string name = typeof(TActivity).Name; + + // Dapr workflows are implemented as specialized Durable Task orchestrations + this.factories.Add(name, (DurableTaskRegistry registry) => + { + registry.AddActivity(name, serviceProvider => + { + // Workflow activity classes support dependency injection. + TActivity activity = ActivatorUtilities.CreateInstance(serviceProvider); + return new ActivityWrapper(activity); + }); + }); + } + /// /// Method to add workflows and activities to the registry. /// @@ -75,6 +126,47 @@ internal void AddWorkflowsAndActivitiesToRegistry(DurableTaskRegistry registry) factory.Invoke(registry); // This adds workflows to the registry indirectly. } } + + /// + /// Helper class that provides a Durable Task orchestrator wrapper for a workflow. + /// + class OrchestratorWrapper : ITaskOrchestrator + { + readonly IWorkflow workflow; + + public OrchestratorWrapper(IWorkflow workflow) + { + this.workflow = workflow; + } + + public Type InputType => this.workflow.InputType; + + public Type OutputType => this.workflow.OutputType; + + public Task RunAsync(TaskOrchestrationContext context, object? input) + { + return this.workflow.RunAsync(new WorkflowContext(context), input); + } + } + + class ActivityWrapper : ITaskActivity + { + readonly IWorkflowActivity activity; + + public ActivityWrapper(IWorkflowActivity activity) + { + this.activity = activity; + } + + public Type InputType => this.activity.InputType; + + public Type OutputType => this.activity.OutputType; + + public Task RunAsync(TaskActivityContext context, object? input) + { + return this.activity.RunAsync(new WorkflowActivityContext(context), input); + } + } } } diff --git a/src/Dapr.Workflow/WorkflowMetadata.cs b/src/Dapr.Workflow/WorkflowRuntimeStatus.cs similarity index 51% rename from src/Dapr.Workflow/WorkflowMetadata.cs rename to src/Dapr.Workflow/WorkflowRuntimeStatus.cs index 603b058c6..01198d327 100644 --- a/src/Dapr.Workflow/WorkflowMetadata.cs +++ b/src/Dapr.Workflow/WorkflowRuntimeStatus.cs @@ -1,5 +1,5 @@ // ------------------------------------------------------------------------ -// Copyright 2022 The Dapr Authors +// Copyright 2023 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -11,34 +11,46 @@ // limitations under the License. // ------------------------------------------------------------------------ -using Microsoft.DurableTask.Client; - namespace Dapr.Workflow { /// - /// Represents a snapshot of a workflow instance's current state, including metadata. + /// Enum describing the runtime status of the workflow. /// - public class WorkflowMetadata + public enum WorkflowRuntimeStatus { - internal WorkflowMetadata(OrchestrationMetadata? metadata) - { - this.Details = metadata; - } + /// + /// The status of the workflow is unknown. + /// + Unknown = -1, + + /// + /// The workflow started running. + /// + Running, + + /// + /// The workflow completed normally. + /// + Completed, + + /// + /// The workflow completed with an unhandled exception. + /// + Failed, /// - /// Gets a value indicating whether the requested workflow instance exists. + /// The workflow was abruptly terminated via a management API call. /// - public bool Exists => this.Details != null; + Terminated, /// - /// Gets a value indicating whether the requested workflow is in a running state. + /// The workflow was scheduled but hasn't started running. /// - public bool IsWorkflowRunning => this.Details?.RuntimeStatus == OrchestrationRuntimeStatus.Running; + Pending, /// - /// Gets the detailed metadata for the requested workflow instance. - /// This value will be null when is false. + /// The workflow was suspended. /// - public OrchestrationMetadata? Details { get; } + Suspended, } } diff --git a/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs b/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs index eaee2c670..d14113eeb 100644 --- a/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs +++ b/src/Dapr.Workflow/WorkflowServiceCollectionExtensions.cs @@ -14,12 +14,10 @@ namespace Dapr.Workflow { using System; - using System.Linq; - using Microsoft.Extensions.DependencyInjection; - using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Worker; - using System.Collections.Generic; + using Microsoft.Extensions.DependencyInjection; + using Microsoft.Extensions.DependencyInjection.Extensions; /// /// Contains extension methods for using Dapr Workflow with dependency injection. @@ -41,12 +39,39 @@ public static IServiceCollection AddDaprWorkflow( } serviceCollection.TryAddSingleton(); - serviceCollection.TryAddSingleton(); + serviceCollection.TryAddSingleton(); serviceCollection.AddDaprClient(); + serviceCollection.AddOptions().Configure(configure); + + static bool TryGetGrpcAddress(out string address) + { + // TODO: Ideally we should be using DaprDefaults.cs for this. However, there are two blockers: + // 1. DaprDefaults.cs uses 127.0.0.1 instead of localhost, which prevents testing with Dapr on WSL2 and the app on Windows + // 2. DaprDefaults.cs doesn't compile when the project has C# nullable reference types enabled. + // If the above issues are fixed (ensuring we don't regress anything) we should switch to using the logic in DaprDefaults.cs. + string? daprPortStr = Environment.GetEnvironmentVariable("DAPR_GRPC_PORT"); + if (int.TryParse(Environment.GetEnvironmentVariable("DAPR_GRPC_PORT"), out int daprGrpcPort)) + { + address = $"localhost:{daprGrpcPort}"; + return true; + } + + address = string.Empty; + return false; + } + serviceCollection.AddDurableTaskClient(builder => { - builder.UseGrpc(); + if (TryGetGrpcAddress(out string address)) + { + builder.UseGrpc(address); + } + else + { + builder.UseGrpc(); + } + builder.RegisterDirectly(); }); @@ -55,7 +80,15 @@ public static IServiceCollection AddDaprWorkflow( WorkflowRuntimeOptions options = new(); configure?.Invoke(options); - builder.UseGrpc(); + if (TryGetGrpcAddress(out string address)) + { + builder.UseGrpc(address); + } + else + { + builder.UseGrpc(); + } + builder.AddTasks(registry => options.AddWorkflowsAndActivitiesToRegistry(registry)); }); diff --git a/src/Dapr.Workflow/WorkflowState.cs b/src/Dapr.Workflow/WorkflowState.cs new file mode 100644 index 000000000..62bf5e7bb --- /dev/null +++ b/src/Dapr.Workflow/WorkflowState.cs @@ -0,0 +1,147 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Workflow +{ + using System; + using Microsoft.DurableTask.Client; + + /// + /// Represents a snapshot of a workflow instance's current state, including runtime status. + /// + public class WorkflowState + { + readonly OrchestrationMetadata? workflowState; + + internal WorkflowState(OrchestrationMetadata? workflowState) + { + // This value will be null if the workflow doesn't exist. + this.workflowState = workflowState; + } + + /// + /// Gets a value indicating whether the requested workflow instance exists. + /// + public bool Exists => this.workflowState != null; + + /// + /// Gets a value indicating whether the requested workflow is in a running state. + /// + public bool IsWorkflowRunning => this.workflowState?.RuntimeStatus == OrchestrationRuntimeStatus.Running; + + /// + /// Gets a value indicating whether the requested workflow is in a terminal state. + /// + public bool IsWorkflowCompleted => this.workflowState?.IsCompleted == true; + + /// + /// Gets the time at which this workflow instance was created. + /// + public DateTimeOffset CreatedAt => this.workflowState?.CreatedAt ?? default; + + /// + /// Gets the time at which this workflow instance last had its state updated. + /// + public DateTimeOffset LastUpdatedAt => this.workflowState?.LastUpdatedAt ?? default; + + /// + /// Gets the execution status of the workflow. + /// + public WorkflowRuntimeStatus RuntimeStatus + { + get + { + if (this.workflowState == null) + { + return WorkflowRuntimeStatus.Unknown; + } + + switch (this.workflowState.RuntimeStatus) + { + case OrchestrationRuntimeStatus.Running: + return WorkflowRuntimeStatus.Running; + case OrchestrationRuntimeStatus.Completed: + return WorkflowRuntimeStatus.Completed; + case OrchestrationRuntimeStatus.Failed: + return WorkflowRuntimeStatus.Failed; + case OrchestrationRuntimeStatus.Terminated: + return WorkflowRuntimeStatus.Terminated; + case OrchestrationRuntimeStatus.Pending: + return WorkflowRuntimeStatus.Pending; + default: + return WorkflowRuntimeStatus.Unknown; + } + } + } + + /// + /// Deserializes the workflow input into . + /// + /// The type to deserialize the workflow input into. + /// Returns the input as , or returns a default value if the workflow doesn't exist. + public T? ReadInputAs() + { + if (this.workflowState == null) + { + return default; + } + + if (string.IsNullOrEmpty(this.workflowState.SerializedInput)) + { + return default; + } + + return this.workflowState.ReadInputAs(); + } + + /// + /// Deserializes the workflow output into . + /// + /// The type to deserialize the workflow output into. + /// Returns the output as , or returns a default value if the workflow doesn't exist. + public T? ReadOutputAs() + { + if (this.workflowState == null) + { + return default; + } + + if (string.IsNullOrEmpty(this.workflowState.SerializedOutput)) + { + return default; + } + + return this.workflowState.ReadOutputAs(); + } + + /// + /// Deserializes the workflow's custom status into . + /// + /// The type to deserialize the workflow's custom status into. + /// Returns the custom status as , or returns a default value if the workflow doesn't exist. + public T? ReadCustomStatusAs() + { + if (this.workflowState == null) + { + return default; + } + + if (string.IsNullOrEmpty(this.workflowState.SerializedCustomStatus)) + { + return default; + } + + return this.workflowState.ReadCustomStatusAs(); + } + } +}