Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/Dapr.Workflow/Client/ProtoConverters.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,42 @@ public static WorkflowRuntimeStatus ToRuntimeStatus(OrchestrationStatus status)
OrchestrationStatus.Stalled => WorkflowRuntimeStatus.Stalled,
_ => WorkflowRuntimeStatus.Unknown
};

/// <summary>
/// Converts a proto <see cref="HistoryEvent"/> to <see cref="WorkflowHistoryEvent"/>.
/// </summary>
public static WorkflowHistoryEvent ToWorkflowHistoryEvent(HistoryEvent historyEvent) =>
new(historyEvent.EventId,
ToHistoryEventType(historyEvent.EventTypeCase),
historyEvent.Timestamp?.ToDateTime() ?? DateTime.MinValue);

/// <summary>
/// Converts the proto history event type to <see cref="WorkflowHistoryEventType"/>.
/// </summary>
public static WorkflowHistoryEventType ToHistoryEventType(HistoryEvent.EventTypeOneofCase eventType)
=> eventType switch
{
HistoryEvent.EventTypeOneofCase.ExecutionStarted => WorkflowHistoryEventType.ExecutionStarted,
HistoryEvent.EventTypeOneofCase.ExecutionCompleted => WorkflowHistoryEventType.ExecutionCompleted,
HistoryEvent.EventTypeOneofCase.ExecutionTerminated => WorkflowHistoryEventType.ExecutionTerminated,
HistoryEvent.EventTypeOneofCase.TaskScheduled => WorkflowHistoryEventType.TaskScheduled,
HistoryEvent.EventTypeOneofCase.TaskCompleted => WorkflowHistoryEventType.TaskCompleted,
HistoryEvent.EventTypeOneofCase.TaskFailed => WorkflowHistoryEventType.TaskFailed,
HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated => WorkflowHistoryEventType.SubOrchestrationInstanceCreated,
HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted => WorkflowHistoryEventType.SubOrchestrationInstanceCompleted,
HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceFailed => WorkflowHistoryEventType.SubOrchestrationInstanceFailed,
HistoryEvent.EventTypeOneofCase.TimerCreated => WorkflowHistoryEventType.TimerCreated,
HistoryEvent.EventTypeOneofCase.TimerFired => WorkflowHistoryEventType.TimerFired,
HistoryEvent.EventTypeOneofCase.OrchestratorStarted => WorkflowHistoryEventType.OrchestratorStarted,
HistoryEvent.EventTypeOneofCase.OrchestratorCompleted => WorkflowHistoryEventType.OrchestratorCompleted,
HistoryEvent.EventTypeOneofCase.EventSent => WorkflowHistoryEventType.EventSent,
HistoryEvent.EventTypeOneofCase.EventRaised => WorkflowHistoryEventType.EventRaised,
HistoryEvent.EventTypeOneofCase.GenericEvent => WorkflowHistoryEventType.GenericEvent,
HistoryEvent.EventTypeOneofCase.HistoryState => WorkflowHistoryEventType.HistoryState,
HistoryEvent.EventTypeOneofCase.ContinueAsNew => WorkflowHistoryEventType.ContinueAsNew,
HistoryEvent.EventTypeOneofCase.ExecutionSuspended => WorkflowHistoryEventType.ExecutionSuspended,
HistoryEvent.EventTypeOneofCase.ExecutionResumed => WorkflowHistoryEventType.ExecutionResumed,
HistoryEvent.EventTypeOneofCase.ExecutionStalled => WorkflowHistoryEventType.ExecutionStalled,
_ => WorkflowHistoryEventType.Unknown
};
}
38 changes: 38 additions & 0 deletions src/Dapr.Workflow/Client/RerunWorkflowFromEventOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// ------------------------------------------------------------------------
// Copyright 2026 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

namespace Dapr.Workflow.Client;

/// <summary>
/// Options for the <see cref="Dapr.Workflow.IDaprWorkflowClient.RerunWorkflowFromEventAsync"/> operation.
/// </summary>
public sealed class RerunWorkflowFromEventOptions
{
/// <summary>
/// Gets or sets the new instance ID to use for the rerun workflow instance.
/// If not specified, a random instance ID will be generated.
/// </summary>
public string? NewInstanceId { get; set; }

/// <summary>
/// Gets or sets the optional input to provide when rerunning the workflow, applied at the
/// next activity event. When set, <see cref="OverwriteInput"/> must also be set to <c>true</c>.
/// </summary>
public object? Input { get; set; }

/// <summary>
/// Gets or sets a value indicating whether the workflow's input at the rerun point (for the
/// next activity event) should be overwritten with <see cref="Input"/>.
/// </summary>
public bool OverwriteInput { get; set; }
}
37 changes: 37 additions & 0 deletions src/Dapr.Workflow/Client/WorkflowClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// ------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -182,6 +183,42 @@ public abstract Task ResumeWorkflowAsync(string instanceId, string? reason = nul
/// <c>false</c>.</returns>
public abstract Task<bool> PurgeInstanceAsync(string instanceId, CancellationToken cancellationToken = default);

/// <summary>
/// Lists workflow instance IDs with optional pagination.
/// </summary>
/// <param name="continuationToken">The continuation token from a previous call, or <c>null</c> for the first page.</param>
/// <param name="pageSize">The maximum number of instance IDs to return, or <c>null</c> for no limit.</param>
/// <param name="cancellationToken">A token that can be used to cancel the operation.</param>
/// <returns>A page of instance IDs and an optional continuation token for the next page.</returns>
public abstract Task<WorkflowInstancePage> ListInstanceIdsAsync(
string? continuationToken = null,
int? pageSize = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Gets the full history of a workflow instance.
/// </summary>
/// <param name="instanceId">The instance ID of the workflow to get history for.</param>
/// <param name="cancellationToken">A token that can be used to cancel the operation.</param>
/// <returns>The list of history events for the workflow instance.</returns>
public abstract Task<IReadOnlyList<WorkflowHistoryEvent>> GetInstanceHistoryAsync(
string instanceId,
CancellationToken cancellationToken = default);

/// <summary>
/// Reruns a workflow from a specific event ID, creating a new workflow instance.
/// </summary>
/// <param name="sourceInstanceId">The instance ID of the source workflow to rerun from.</param>
/// <param name="eventId">The event ID to rerun from.</param>
/// <param name="options">Optional configuration for the rerun operation.</param>
/// <param name="cancellationToken">A token that can be used to cancel the operation.</param>
/// <returns>The instance ID of the new workflow instance.</returns>
public abstract Task<string> RerunWorkflowFromEventAsync(
string sourceInstanceId,
uint eventId,
RerunWorkflowFromEventOptions? options = null,
CancellationToken cancellationToken = default);

/// <inheritdoc />
public abstract ValueTask DisposeAsync();
}
97 changes: 97 additions & 0 deletions src/Dapr.Workflow/Client/WorkflowGrpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// ------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Common;
Expand Down Expand Up @@ -224,6 +226,101 @@ public override async Task<bool> PurgeInstanceAsync(string instanceId, Cancellat
return purged;
}

/// <inheritdoc />
public override async Task<WorkflowInstancePage> ListInstanceIdsAsync(
string? continuationToken = null,
int? pageSize = null,
CancellationToken cancellationToken = default)
{
var request = new grpc.ListInstanceIDsRequest();

if (continuationToken is not null)
{
request.ContinuationToken = continuationToken;
}

if (pageSize.HasValue)
{
Comment thread
JoshVanL marked this conversation as resolved.
ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(pageSize.Value, 0, nameof(pageSize));
request.PageSize = (uint)pageSize.Value;
}

var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.ListInstanceIDsAsync(request, grpcCallOptions);

logger.LogListInstanceIds(response.InstanceIds.Count);

return new WorkflowInstancePage(
response.InstanceIds.ToList().AsReadOnly(),
response.HasContinuationToken ? response.ContinuationToken : null);
}

/// <inheritdoc />
public override async Task<IReadOnlyList<WorkflowHistoryEvent>> GetInstanceHistoryAsync(
string instanceId,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrEmpty(instanceId);

var request = new grpc.GetInstanceHistoryRequest
{
InstanceId = instanceId
};

var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.GetInstanceHistoryAsync(request, grpcCallOptions);

var events = response.Events
.Select(ProtoConverters.ToWorkflowHistoryEvent)
.ToList()
.AsReadOnly();

logger.LogGetInstanceHistory(instanceId, events.Count);

return events;
}

/// <inheritdoc />
public override async Task<string> RerunWorkflowFromEventAsync(
string sourceInstanceId,
uint eventId,
RerunWorkflowFromEventOptions? options = null,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrEmpty(sourceInstanceId);

if (options is { Input: not null, OverwriteInput: false })
{
throw new ArgumentException(
$"{nameof(RerunWorkflowFromEventOptions.OverwriteInput)} must be true when {nameof(RerunWorkflowFromEventOptions.Input)} is set.",
nameof(options));
}

var request = new grpc.RerunWorkflowFromEventRequest
{
SourceInstanceID = sourceInstanceId,
EventID = eventId,
OverwriteInput = options?.OverwriteInput ?? false
};
Comment thread
JoshVanL marked this conversation as resolved.

if (options?.NewInstanceId is not null)
{
request.NewInstanceID = options.NewInstanceId;
}

if (options is { OverwriteInput: true })
{
request.Input = SerializeToJson(options.Input);
}
Comment thread
WhitWaldo marked this conversation as resolved.

var grpcCallOptions = CreateCallOptions(cancellationToken);
var response = await grpcClient.RerunWorkflowFromEventAsync(request, grpcCallOptions);

logger.LogRerunWorkflowFromEvent(sourceInstanceId, eventId, response.NewInstanceID);

return response.NewInstanceID;
}

/// <inheritdoc />
public override ValueTask DisposeAsync()
{
Expand Down
99 changes: 99 additions & 0 deletions src/Dapr.Workflow/Client/WorkflowHistoryEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// ------------------------------------------------------------------------
// Copyright 2026 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System;

namespace Dapr.Workflow.Client;

/// <summary>
/// Represents a single event in a workflow instance's history.
/// </summary>
/// <param name="EventId">The unique event ID within the workflow instance history.</param>
/// <param name="EventType">The type of history event.</param>
/// <param name="Timestamp">The timestamp when the event occurred.</param>
public sealed record WorkflowHistoryEvent(
int EventId,
WorkflowHistoryEventType EventType,
DateTime Timestamp);

/// <summary>
/// Represents the type of a workflow history event.
/// </summary>
public enum WorkflowHistoryEventType
{
/// <summary>Unknown event type.</summary>
Unknown = 0,

/// <summary>The workflow execution started.</summary>
ExecutionStarted,

/// <summary>The workflow execution completed.</summary>
ExecutionCompleted,

/// <summary>The workflow execution was terminated.</summary>
ExecutionTerminated,

/// <summary>A task (activity) was scheduled.</summary>
TaskScheduled,

/// <summary>A task (activity) completed successfully.</summary>
TaskCompleted,

/// <summary>A task (activity) failed.</summary>
TaskFailed,

/// <summary>A sub-orchestration instance was created.</summary>
SubOrchestrationInstanceCreated,

/// <summary>A sub-orchestration instance completed.</summary>
SubOrchestrationInstanceCompleted,

/// <summary>A sub-orchestration instance failed.</summary>
SubOrchestrationInstanceFailed,

/// <summary>A timer was created.</summary>
TimerCreated,

/// <summary>A timer fired.</summary>
TimerFired,

/// <summary>An orchestrator started processing.</summary>
OrchestratorStarted,

/// <summary>An orchestrator completed processing.</summary>
OrchestratorCompleted,

/// <summary>An event was sent to another instance.</summary>
EventSent,

/// <summary>An external event was raised.</summary>
EventRaised,

/// <summary>A generic event.</summary>
GenericEvent,

/// <summary>A history state event.</summary>
HistoryState,

/// <summary>The workflow continued as new.</summary>
ContinueAsNew,

/// <summary>The workflow execution was suspended.</summary>
ExecutionSuspended,

/// <summary>The workflow execution was resumed.</summary>
ExecutionResumed,

/// <summary>The workflow execution stalled.</summary>
ExecutionStalled,
}
27 changes: 27 additions & 0 deletions src/Dapr.Workflow/Client/WorkflowInstancePage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// ------------------------------------------------------------------------
// Copyright 2026 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System.Collections.Generic;

namespace Dapr.Workflow.Client;

/// <summary>
/// Represents a page of workflow instance IDs returned by a list operation.
/// </summary>
/// <param name="InstanceIds">The workflow instance IDs in this page.</param>
/// <param name="ContinuationToken">
/// The continuation token used to retrieve the next page, or <c>null</c> if there are no more pages.
/// </param>
public sealed record WorkflowInstancePage(
IReadOnlyList<string> InstanceIds,
string? ContinuationToken);
Loading
Loading