Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 17 additions & 11 deletions examples/Workflow/WorkflowConsoleApp/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
using WorkflowConsoleApp.Models;
using WorkflowConsoleApp.Workflows;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;

const string storeName = "statestore";
const string StoreName = "statestore";
const string WorkflowComponent = "dapr";

// The workflow host is a background service that connects to the sidecar over gRPC
var builder = Host.CreateDefaultBuilder(args).ConfigureServices(services =>
Expand Down Expand Up @@ -58,8 +58,6 @@
// This is just to make the log output look a little nicer.
Thread.Sleep(TimeSpan.FromSeconds(1));

DaprWorkflowClient workflowClient = host.Services.GetRequiredService<DaprWorkflowClient>();

var baseInventory = new List<InventoryItem>
{
new InventoryItem(Name: "Paperclips", PerItemCost: 5, Quantity: 100),
Expand Down Expand Up @@ -114,23 +112,31 @@

// Start the workflow using the order ID as the workflow ID
Console.WriteLine($"Starting order workflow '{orderId}' purchasing {amount} {itemName}");
await workflowClient.ScheduleNewWorkflowAsync(
name: nameof(OrderProcessingWorkflow),
await daprClient.StartWorkflowAsync(
WorkflowComponent,
Comment thread
cgillum marked this conversation as resolved.
Outdated
workflowName: nameof(OrderProcessingWorkflow),
input: orderInfo,
instanceId: orderId);

// Wait for the workflow to start and confirm the input
GetWorkflowResponse state = await daprClient.WaitForWorkflowStartAsync(
instanceId: orderId,
input: orderInfo);
workflowComponent: WorkflowComponent);

Console.WriteLine($"{state.WorkflowName} (ID = {orderId}) started successfully with {state.ReadInputAs<OrderPayload>()}");

// Wait for the workflow to complete
WorkflowState state = await workflowClient.WaitForWorkflowCompletionAsync(
state = await daprClient.WaitForWorkflowCompletionAsync(
instanceId: orderId,
getInputsAndOutputs: true);
workflowComponent: WorkflowComponent);

if (state.RuntimeStatus == WorkflowRuntimeStatus.Completed)
{
OrderResult result = state.ReadOutputAs<OrderResult>();
if (result.Processed)
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"Order workflow is {state.RuntimeStatus} and the order was processed successfully.");
Console.WriteLine($"Order workflow is {state.RuntimeStatus} and the order was processed successfully ({result}).");
Console.ResetColor();
}
else
Expand All @@ -154,6 +160,6 @@ static async Task RestockInventory(DaprClient daprClient, List<InventoryItem> in
foreach (var item in inventory)
{
Console.WriteLine($"*** \t{item.Name}: {item.Quantity}");
await daprClient.SaveStateAsync(storeName, item.Name.ToLowerInvariant(), item);
await daprClient.SaveStateAsync(StoreName, item.Name.ToLowerInvariant(), item);
}
}
81 changes: 77 additions & 4 deletions src/Dapr.Client/DaprClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -978,22 +978,95 @@ public abstract Task<UnlockResponse> Unlock(
/// <summary>
/// Attempt to start the given workflow with response indicating success.
/// </summary>
/// <param name="instanceId">Identifier of the specific run.</param>
/// <param name="workflowComponent">The component to interface with.</param>
/// <param name="workflowName">Name of the workflow to run.</param>
/// <param name="instanceId">Identifier of the specific run.</param>
/// <param name="input">The JSON-serializeable input for the given workflow.</param>
/// <param name="workflowOptions">The list of options that are potentially needed to start a workflow.</param>
/// <param name="input">The input input for the given workflow.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task"/> containing a <see cref="StartWorkflowResponse"/></returns>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public abstract Task<StartWorkflowResponse> StartWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
Object input,
string instanceId = null,
object input = null,
IReadOnlyDictionary<string, string> workflowOptions = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Waits for a workflow to start running and returns a <see cref="GetWorkflowResponse"/> object that contains metadata
/// about the started workflow.
/// </summary>
/// <remarks>
/// <para>
/// A "started" workflow instance is any instance not in the <see cref="WorkflowRuntimeStatus.Pending"/> state.
/// </para><para>
/// This method will return a completed task if the workflow has already started running or has already completed.
/// </para>
/// </remarks>
/// <param name="instanceId">The unique ID of the workflow instance to wait for.</param>
/// <param name="workflowComponent">The component to interface with.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> that can be used to cancel the wait operation.</param>
/// <returns>
/// Returns a <see cref="GetWorkflowResponse"/> record that describes the workflow instance and its execution status.
/// </returns>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public virtual async Task<GetWorkflowResponse> WaitForWorkflowStartAsync(
Comment thread
halspang marked this conversation as resolved.
string instanceId,
string workflowComponent,
CancellationToken cancellationToken = default)
{
while (true)
{
GetWorkflowResponse response = await this.GetWorkflowAsync(instanceId, workflowComponent, cancellationToken);
Comment thread
cgillum marked this conversation as resolved.
Outdated
if (response.RuntimeStatus != WorkflowRuntimeStatus.Pending)
{
return response;
}

await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
}
}

/// <summary>
/// Waits for a workflow to complete and returns a <see cref="GetWorkflowResponse"/>
/// object that contains metadata about the started instance.
/// </summary>
/// <remarks>
/// <para>
/// A "completed" workflow instance is any instance in one of the terminal states. For example, the
/// <see cref="WorkflowRuntimeStatus.Completed"/>, <see cref="WorkflowRuntimeStatus.Failed"/>, or
/// <see cref="WorkflowRuntimeStatus.Terminated"/> states.
/// </para><para>
/// Workflows are long-running and could take hours, days, or months before completing.
/// Workflows can also be eternal, in which case they'll never complete unless terminated.
/// In such cases, this call may block indefinitely, so care must be taken to ensure appropriate timeouts are
/// enforced using the <paramref name="cancellationToken"/> parameter.
/// </para><para>
/// If a workflow instance is already complete when this method is called, the method will return immediately.
/// </para>
/// </remarks>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public virtual async Task<GetWorkflowResponse> WaitForWorkflowCompletionAsync(
string instanceId,
string workflowComponent,
CancellationToken cancellationToken = default)
{
while (true)
{
GetWorkflowResponse response = await this.GetWorkflowAsync(instanceId, workflowComponent, cancellationToken);
Comment thread
cgillum marked this conversation as resolved.
Outdated
if (response.RuntimeStatus == WorkflowRuntimeStatus.Completed ||
response.RuntimeStatus == WorkflowRuntimeStatus.Failed ||
response.RuntimeStatus == WorkflowRuntimeStatus.Terminated)
{
return response;
}

await Task.Delay(TimeSpan.FromMilliseconds(500), cancellationToken);
}
}

/// <summary>
/// Attempt to get information about the given workflow.
/// </summary>
Expand Down
64 changes: 45 additions & 19 deletions src/Dapr.Client/DaprClientGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1467,10 +1467,10 @@ public async override Task<UnlockResponse> Unlock(
/// <inheritdoc/>
[Obsolete]
public async override Task<StartWorkflowResponse> StartWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
Object input,
string instanceId = null,
object input = null,
IReadOnlyDictionary<string, string> workflowOptions = default,
CancellationToken cancellationToken = default)
{
Expand All @@ -1480,14 +1480,18 @@ public async override Task<StartWorkflowResponse> StartWorkflowAsync(
ArgumentVerifier.ThrowIfNull(input, nameof(input));

// Serialize json data. Converts input object to bytes and then bytestring inside the request.
var jsonUtf8Bytes = JsonSerializer.SerializeToUtf8Bytes(input);
byte[] jsonUtf8Bytes = null;
if (input is not null)
{
jsonUtf8Bytes = JsonSerializer.SerializeToUtf8Bytes(input);
}

var request = new Autogenerated.StartWorkflowRequest()
{
InstanceId = instanceId,
WorkflowComponent = workflowComponent,
WorkflowName = workflowName,
Input = ByteString.CopyFrom(jsonUtf8Bytes),
Input = jsonUtf8Bytes is not null ? ByteString.CopyFrom(jsonUtf8Bytes) : null,
};

if (workflowOptions?.Count > 0)
Expand Down Expand Up @@ -1533,31 +1537,53 @@ public async override Task<GetWorkflowResponse> GetWorkflowAsync(
var response = await client.GetWorkflowAlpha1Async(request, options);
if (response == null)
{
throw new DaprException("Get workflow operation failed: the object response is null");
}
if (response.CreatedAt == null)
{
response.CreatedAt = new Timestamp();
throw new DaprException("Get workflow operation failed: CreatedAt object response is null");
throw new DaprException("Get workflow operation failed: the Dapr endpoint returned an empty result.");
}
if (response.LastUpdatedAt == null)

response.CreatedAt ??= new Timestamp();
response.LastUpdatedAt ??= response.CreatedAt;

return new GetWorkflowResponse
{
response.LastUpdatedAt = response.CreatedAt;
}
return new GetWorkflowResponse(response.InstanceId,
response.WorkflowName,
response.CreatedAt.ToDateTime(),
response.LastUpdatedAt.ToDateTime(),
response.RuntimeStatus,
response.Properties);
InstanceId = response.InstanceId,
WorkflowName = response.WorkflowName,
WorkflowComponentName = workflowComponent,
CreatedAt = response.CreatedAt.ToDateTime(),
LastUpdatedAt = response.LastUpdatedAt.ToDateTime(),
RuntimeStatus = GetWorkflowRuntimeStatus(response.RuntimeStatus),
Properties = response.Properties,
FailureDetails = GetWorkflowFailureDetails(response, workflowComponent),
};
}
catch (RpcException ex)
{
throw new DaprException("Get workflow operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}

private static WorkflowRuntimeStatus GetWorkflowRuntimeStatus(string runtimeStatus)
{
if (!System.Enum.TryParse(runtimeStatus, true /* ignoreCase */, out WorkflowRuntimeStatus status))
{
status = WorkflowRuntimeStatus.Unknown;
}

return status;
}

private static WorkflowFailureDetails GetWorkflowFailureDetails(Autogenerated.GetWorkflowResponse response, string componentName)
{
// FUTURE: Make this part of the protobuf contract instead of getting it from properties
// NOTE: The use of | instead of || is intentional. We want to get all the values.
if (response.Properties.TryGetValue($"{componentName}.workflow.failure.error_type", out string errorType) |
response.Properties.TryGetValue($"{componentName}.workflow.failure.error_message", out string errorMessage) |
response.Properties.TryGetValue($"{componentName}.workflow.failure.stack_trace", out string stackTrace))
{
return new WorkflowFailureDetails(errorMessage, errorType, stackTrace);
}

return null;
}

/// <inheritdoc/>
[Obsolete]
Expand Down
104 changes: 85 additions & 19 deletions src/Dapr.Client/GetWorkflowResponse.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -11,25 +11,91 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System;
using System.Collections.Generic;

namespace Dapr.Client
{
using System;
using System.Collections.Generic;
using System.Text.Json;
Comment thread
cgillum marked this conversation as resolved.
Outdated

/// <summary>
/// The response type for the <see cref="DaprClient.GetWorkflowAsync"/> API.
/// </summary>
public class GetWorkflowResponse
{
/// <summary>
/// Gets the instance ID of the workflow.
/// </summary>
public string InstanceId { get; init; }

/// <summary>
/// Gets the name of the workflow.
/// </summary>
public string WorkflowName { get; init; }

/// <summary>
/// Gets the name of the workflow component.
/// </summary>
public string WorkflowComponentName { get; init; }

/// <summary>
/// Gets the time at which the workflow was created.
/// </summary>
public DateTime CreatedAt { get; init; }

/// <summary>
/// Gets the time at which the workflow was last updated.
/// </summary>
public DateTime LastUpdatedAt { get; init; }

/// <summary>
/// Gets the runtime status of the workflow.
/// </summary>
public WorkflowRuntimeStatus RuntimeStatus { get; init; }

/// <summary>
/// Gets the component-specific workflow properties.
/// </summary>
public IReadOnlyDictionary<string, string> Properties { get; init; }

/// <summary>
/// Gets the details associated with the workflow failure, if any.
/// </summary>
public WorkflowFailureDetails FailureDetails { get; init; }

/// <summary>
/// Deserializes the workflow input into <typeparamref name="T"/> using <see cref="JsonSerializer"/>.
/// </summary>
/// <typeparam name="T">The type to deserialize the workflow input into.</typeparam>
/// <param name="options">Options to control the behavior during parsing.</param>
/// <returns>Returns the input as <typeparamref name="T"/>, or returns a default value if the workflow doesn't have an input.</returns>
public T ReadInputAs<T>(JsonSerializerOptions options = null)
{
// FUTURE: Make this part of the protobuf contract instead of properties
string defaultInputKey = $"{this.WorkflowComponentName}.workflow.input";
if (!this.Properties.TryGetValue(defaultInputKey, out string serializedInput))
{
return default;
}

return JsonSerializer.Deserialize<T>(serializedInput, options);
}

/// <summary>
/// Initializes a new <see cref="GetWorkflowResponse" />.
/// </summary>
/// <param name="InstanceId">The instance ID associated with this response.</param>
/// <param name="WorkflowName">The name of the workflow associated with this response.</param>
/// <param name="CreatedAt">The time at which the workflow started executing.</param>
/// <param name="LastUpdatedAt">The time at which the workflow started executing.</param>
/// <param name="RuntimeStatus">The current runtime status of the workflow.</param>
/// <param name="Properties">The response properties.</param>
public record GetWorkflowResponse(
string InstanceId,
string WorkflowName,
DateTime CreatedAt,
DateTime LastUpdatedAt,
string RuntimeStatus,
IReadOnlyDictionary<string, string> Properties);
/// Deserializes the workflow output into <typeparamref name="T"/> using <see cref="JsonSerializer"/>.
/// </summary>
/// <typeparam name="T">The type to deserialize the workflow output into.</typeparam>
/// <param name="options">Options to control the behavior during parsing.</param>
/// <returns>Returns the output as <typeparamref name="T"/>, or returns a default value if the workflow doesn't have an output.</returns>
public T ReadOutputAs<T>(JsonSerializerOptions options = null)
{
// FUTURE: Make this part of the protobuf contract instead of properties
string defaultOutputKey = $"{this.WorkflowComponentName}.workflow.output";
if (!this.Properties.TryGetValue(defaultOutputKey, out string serializedOutput))
{
return default;
}

return JsonSerializer.Deserialize<T>(serializedOutput, options);
}
}
}
Loading