diff --git a/src/Dapr.Workflow/Client/ProtoConverters.cs b/src/Dapr.Workflow/Client/ProtoConverters.cs index 3a5a65312..a6362fae9 100644 --- a/src/Dapr.Workflow/Client/ProtoConverters.cs +++ b/src/Dapr.Workflow/Client/ProtoConverters.cs @@ -52,4 +52,42 @@ public static WorkflowRuntimeStatus ToRuntimeStatus(OrchestrationStatus status) OrchestrationStatus.Stalled => WorkflowRuntimeStatus.Stalled, _ => WorkflowRuntimeStatus.Unknown }; + + /// + /// Converts a proto to . + /// + public static WorkflowHistoryEvent ToWorkflowHistoryEvent(HistoryEvent historyEvent) => + new(historyEvent.EventId, + ToHistoryEventType(historyEvent.EventTypeCase), + historyEvent.Timestamp?.ToDateTime() ?? DateTime.MinValue); + + /// + /// Converts the proto history event type to . + /// + public static WorkflowHistoryEventType ToHistoryEventType(HistoryEvent.EventTypeOneofCase eventType) + => eventType switch + { + HistoryEvent.EventTypeOneofCase.ExecutionStarted => WorkflowHistoryEventType.ExecutionStarted, + HistoryEvent.EventTypeOneofCase.ExecutionCompleted => WorkflowHistoryEventType.ExecutionCompleted, + HistoryEvent.EventTypeOneofCase.ExecutionTerminated => WorkflowHistoryEventType.ExecutionTerminated, + HistoryEvent.EventTypeOneofCase.TaskScheduled => WorkflowHistoryEventType.TaskScheduled, + HistoryEvent.EventTypeOneofCase.TaskCompleted => WorkflowHistoryEventType.TaskCompleted, + HistoryEvent.EventTypeOneofCase.TaskFailed => WorkflowHistoryEventType.TaskFailed, + HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated => WorkflowHistoryEventType.SubOrchestrationInstanceCreated, + HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted => WorkflowHistoryEventType.SubOrchestrationInstanceCompleted, + HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceFailed => WorkflowHistoryEventType.SubOrchestrationInstanceFailed, + HistoryEvent.EventTypeOneofCase.TimerCreated => WorkflowHistoryEventType.TimerCreated, + HistoryEvent.EventTypeOneofCase.TimerFired => WorkflowHistoryEventType.TimerFired, + HistoryEvent.EventTypeOneofCase.OrchestratorStarted => WorkflowHistoryEventType.OrchestratorStarted, + HistoryEvent.EventTypeOneofCase.OrchestratorCompleted => WorkflowHistoryEventType.OrchestratorCompleted, + HistoryEvent.EventTypeOneofCase.EventSent => WorkflowHistoryEventType.EventSent, + HistoryEvent.EventTypeOneofCase.EventRaised => WorkflowHistoryEventType.EventRaised, + HistoryEvent.EventTypeOneofCase.GenericEvent => WorkflowHistoryEventType.GenericEvent, + HistoryEvent.EventTypeOneofCase.HistoryState => WorkflowHistoryEventType.HistoryState, + HistoryEvent.EventTypeOneofCase.ContinueAsNew => WorkflowHistoryEventType.ContinueAsNew, + HistoryEvent.EventTypeOneofCase.ExecutionSuspended => WorkflowHistoryEventType.ExecutionSuspended, + HistoryEvent.EventTypeOneofCase.ExecutionResumed => WorkflowHistoryEventType.ExecutionResumed, + HistoryEvent.EventTypeOneofCase.ExecutionStalled => WorkflowHistoryEventType.ExecutionStalled, + _ => WorkflowHistoryEventType.Unknown + }; } diff --git a/src/Dapr.Workflow/Client/RerunWorkflowFromEventOptions.cs b/src/Dapr.Workflow/Client/RerunWorkflowFromEventOptions.cs new file mode 100644 index 000000000..9016b5958 --- /dev/null +++ b/src/Dapr.Workflow/Client/RerunWorkflowFromEventOptions.cs @@ -0,0 +1,38 @@ +// ------------------------------------------------------------------------ +// Copyright 2026 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Workflow.Client; + +/// +/// Options for the operation. +/// +public sealed class RerunWorkflowFromEventOptions +{ + /// + /// Gets or sets the new instance ID to use for the rerun workflow instance. + /// If not specified, a random instance ID will be generated. + /// + public string? NewInstanceId { get; set; } + + /// + /// Gets or sets the optional input to provide when rerunning the workflow, applied at the + /// next activity event. When set, must also be set to true. + /// + public object? Input { get; set; } + + /// + /// Gets or sets a value indicating whether the workflow's input at the rerun point (for the + /// next activity event) should be overwritten with . + /// + public bool OverwriteInput { get; set; } +} diff --git a/src/Dapr.Workflow/Client/WorkflowClient.cs b/src/Dapr.Workflow/Client/WorkflowClient.cs index 91c2fbeb9..687f288ff 100644 --- a/src/Dapr.Workflow/Client/WorkflowClient.cs +++ b/src/Dapr.Workflow/Client/WorkflowClient.cs @@ -12,6 +12,7 @@ // ------------------------------------------------------------------------ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -182,6 +183,42 @@ public abstract Task ResumeWorkflowAsync(string instanceId, string? reason = nul /// false. public abstract Task PurgeInstanceAsync(string instanceId, CancellationToken cancellationToken = default); + /// + /// Lists workflow instance IDs with optional pagination. + /// + /// The continuation token from a previous call, or null for the first page. + /// The maximum number of instance IDs to return, or null for no limit. + /// A token that can be used to cancel the operation. + /// A page of instance IDs and an optional continuation token for the next page. + public abstract Task ListInstanceIdsAsync( + string? continuationToken = null, + int? pageSize = null, + CancellationToken cancellationToken = default); + + /// + /// Gets the full history of a workflow instance. + /// + /// The instance ID of the workflow to get history for. + /// A token that can be used to cancel the operation. + /// The list of history events for the workflow instance. + public abstract Task> GetInstanceHistoryAsync( + string instanceId, + CancellationToken cancellationToken = default); + + /// + /// Reruns a workflow from a specific event ID, creating a new workflow instance. + /// + /// The instance ID of the source workflow to rerun from. + /// The event ID to rerun from. + /// Optional configuration for the rerun operation. + /// A token that can be used to cancel the operation. + /// The instance ID of the new workflow instance. + public abstract Task RerunWorkflowFromEventAsync( + string sourceInstanceId, + uint eventId, + RerunWorkflowFromEventOptions? options = null, + CancellationToken cancellationToken = default); + /// public abstract ValueTask DisposeAsync(); } diff --git a/src/Dapr.Workflow/Client/WorkflowGrpcClient.cs b/src/Dapr.Workflow/Client/WorkflowGrpcClient.cs index f6c46e95d..64e0b713d 100644 --- a/src/Dapr.Workflow/Client/WorkflowGrpcClient.cs +++ b/src/Dapr.Workflow/Client/WorkflowGrpcClient.cs @@ -12,6 +12,8 @@ // ------------------------------------------------------------------------ using System; +using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Dapr.Common; @@ -224,6 +226,101 @@ public override async Task PurgeInstanceAsync(string instanceId, Cancellat return purged; } + /// + public override async Task ListInstanceIdsAsync( + string? continuationToken = null, + int? pageSize = null, + CancellationToken cancellationToken = default) + { + var request = new grpc.ListInstanceIDsRequest(); + + if (continuationToken is not null) + { + request.ContinuationToken = continuationToken; + } + + if (pageSize.HasValue) + { + ArgumentOutOfRangeException.ThrowIfLessThanOrEqual(pageSize.Value, 0, nameof(pageSize)); + request.PageSize = (uint)pageSize.Value; + } + + var grpcCallOptions = CreateCallOptions(cancellationToken); + var response = await grpcClient.ListInstanceIDsAsync(request, grpcCallOptions); + + logger.LogListInstanceIds(response.InstanceIds.Count); + + return new WorkflowInstancePage( + response.InstanceIds.ToList().AsReadOnly(), + response.HasContinuationToken ? response.ContinuationToken : null); + } + + /// + public override async Task> GetInstanceHistoryAsync( + string instanceId, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrEmpty(instanceId); + + var request = new grpc.GetInstanceHistoryRequest + { + InstanceId = instanceId + }; + + var grpcCallOptions = CreateCallOptions(cancellationToken); + var response = await grpcClient.GetInstanceHistoryAsync(request, grpcCallOptions); + + var events = response.Events + .Select(ProtoConverters.ToWorkflowHistoryEvent) + .ToList() + .AsReadOnly(); + + logger.LogGetInstanceHistory(instanceId, events.Count); + + return events; + } + + /// + public override async Task RerunWorkflowFromEventAsync( + string sourceInstanceId, + uint eventId, + RerunWorkflowFromEventOptions? options = null, + CancellationToken cancellationToken = default) + { + ArgumentException.ThrowIfNullOrEmpty(sourceInstanceId); + + if (options is { Input: not null, OverwriteInput: false }) + { + throw new ArgumentException( + $"{nameof(RerunWorkflowFromEventOptions.OverwriteInput)} must be true when {nameof(RerunWorkflowFromEventOptions.Input)} is set.", + nameof(options)); + } + + var request = new grpc.RerunWorkflowFromEventRequest + { + SourceInstanceID = sourceInstanceId, + EventID = eventId, + OverwriteInput = options?.OverwriteInput ?? false + }; + + if (options?.NewInstanceId is not null) + { + request.NewInstanceID = options.NewInstanceId; + } + + if (options is { OverwriteInput: true }) + { + request.Input = SerializeToJson(options.Input); + } + + var grpcCallOptions = CreateCallOptions(cancellationToken); + var response = await grpcClient.RerunWorkflowFromEventAsync(request, grpcCallOptions); + + logger.LogRerunWorkflowFromEvent(sourceInstanceId, eventId, response.NewInstanceID); + + return response.NewInstanceID; + } + /// public override ValueTask DisposeAsync() { diff --git a/src/Dapr.Workflow/Client/WorkflowHistoryEvent.cs b/src/Dapr.Workflow/Client/WorkflowHistoryEvent.cs new file mode 100644 index 000000000..82cf3ef54 --- /dev/null +++ b/src/Dapr.Workflow/Client/WorkflowHistoryEvent.cs @@ -0,0 +1,99 @@ +// ------------------------------------------------------------------------ +// Copyright 2026 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; + +namespace Dapr.Workflow.Client; + +/// +/// Represents a single event in a workflow instance's history. +/// +/// The unique event ID within the workflow instance history. +/// The type of history event. +/// The timestamp when the event occurred. +public sealed record WorkflowHistoryEvent( + int EventId, + WorkflowHistoryEventType EventType, + DateTime Timestamp); + +/// +/// Represents the type of a workflow history event. +/// +public enum WorkflowHistoryEventType +{ + /// Unknown event type. + Unknown = 0, + + /// The workflow execution started. + ExecutionStarted, + + /// The workflow execution completed. + ExecutionCompleted, + + /// The workflow execution was terminated. + ExecutionTerminated, + + /// A task (activity) was scheduled. + TaskScheduled, + + /// A task (activity) completed successfully. + TaskCompleted, + + /// A task (activity) failed. + TaskFailed, + + /// A sub-orchestration instance was created. + SubOrchestrationInstanceCreated, + + /// A sub-orchestration instance completed. + SubOrchestrationInstanceCompleted, + + /// A sub-orchestration instance failed. + SubOrchestrationInstanceFailed, + + /// A timer was created. + TimerCreated, + + /// A timer fired. + TimerFired, + + /// An orchestrator started processing. + OrchestratorStarted, + + /// An orchestrator completed processing. + OrchestratorCompleted, + + /// An event was sent to another instance. + EventSent, + + /// An external event was raised. + EventRaised, + + /// A generic event. + GenericEvent, + + /// A history state event. + HistoryState, + + /// The workflow continued as new. + ContinueAsNew, + + /// The workflow execution was suspended. + ExecutionSuspended, + + /// The workflow execution was resumed. + ExecutionResumed, + + /// The workflow execution stalled. + ExecutionStalled, +} diff --git a/src/Dapr.Workflow/Client/WorkflowInstancePage.cs b/src/Dapr.Workflow/Client/WorkflowInstancePage.cs new file mode 100644 index 000000000..229a23f2c --- /dev/null +++ b/src/Dapr.Workflow/Client/WorkflowInstancePage.cs @@ -0,0 +1,27 @@ +// ------------------------------------------------------------------------ +// Copyright 2026 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Collections.Generic; + +namespace Dapr.Workflow.Client; + +/// +/// Represents a page of workflow instance IDs returned by a list operation. +/// +/// The workflow instance IDs in this page. +/// +/// The continuation token used to retrieve the next page, or null if there are no more pages. +/// +public sealed record WorkflowInstancePage( + IReadOnlyList InstanceIds, + string? ContinuationToken); diff --git a/src/Dapr.Workflow/DaprWorkflowClient.cs b/src/Dapr.Workflow/DaprWorkflowClient.cs index bb11009fe..f3b02ec6d 100644 --- a/src/Dapr.Workflow/DaprWorkflowClient.cs +++ b/src/Dapr.Workflow/DaprWorkflowClient.cs @@ -12,6 +12,7 @@ // ------------------------------------------------------------------------ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Dapr.Workflow.Client; @@ -315,6 +316,67 @@ public async Task PurgeInstanceAsync( return await _innerClient.PurgeInstanceAsync(instanceId, cancellation); } + /// + /// Lists workflow instance IDs with optional pagination. + /// + /// + /// The continuation token from a previous call, or null to retrieve the first page. + /// + /// + /// The maximum number of instance IDs to return per page, or null for no limit. + /// + /// Token to cancel the list operation. + /// + /// A page containing the instance IDs and an optional continuation token for the next page. + /// + public async Task ListInstanceIdsAsync( + string? continuationToken = null, + int? pageSize = null, + CancellationToken cancellation = default) + { + if (pageSize.HasValue) + { + ArgumentOutOfRangeException.ThrowIfNegativeOrZero(pageSize.Value); + } + + return await _innerClient.ListInstanceIdsAsync(continuationToken, pageSize, cancellation); + } + + /// + /// Gets the full execution history of a workflow instance. + /// + /// The unique ID of the workflow instance to get history for. + /// Token to cancel the retrieval operation. + /// A read-only list of history events for the workflow instance. + /// Thrown if is null or empty. + public async Task> GetInstanceHistoryAsync( + string instanceId, + CancellationToken cancellation = default) + { + ArgumentException.ThrowIfNullOrEmpty(instanceId); + return await _innerClient.GetInstanceHistoryAsync(instanceId, cancellation); + } + + /// + /// Reruns a workflow from a specific event ID, creating a new workflow instance that replays + /// the source workflow's history up to the specified event. + /// + /// The instance ID of the source workflow to rerun from. + /// The event ID in the source workflow's history to rerun from. + /// Optional configuration for the rerun operation. + /// Token to cancel the rerun operation. + /// The instance ID of the newly created workflow instance. + /// Thrown if is null or empty. + public async Task RerunWorkflowFromEventAsync( + string sourceInstanceId, + uint eventId, + RerunWorkflowFromEventOptions? options = null, + CancellationToken cancellation = default) + { + ArgumentException.ThrowIfNullOrEmpty(sourceInstanceId); + return await _innerClient.RerunWorkflowFromEventAsync(sourceInstanceId, eventId, options, cancellation); + } + /// /// Disposes any unmanaged resources associated with this client. /// diff --git a/src/Dapr.Workflow/IDaprWorkflowClient.cs b/src/Dapr.Workflow/IDaprWorkflowClient.cs index a1369f12f..7a0869f2a 100644 --- a/src/Dapr.Workflow/IDaprWorkflowClient.cs +++ b/src/Dapr.Workflow/IDaprWorkflowClient.cs @@ -12,6 +12,7 @@ // ------------------------------------------------------------------------ using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Dapr.Common; @@ -234,4 +235,49 @@ Task ResumeWorkflowAsync( Task PurgeInstanceAsync( string instanceId, CancellationToken cancellation = default); + + /// + /// Lists workflow instance IDs with optional pagination. + /// + /// + /// The continuation token from a previous call, or null to retrieve the first page. + /// + /// + /// The maximum number of instance IDs to return per page, or null for no limit. + /// + /// Token to cancel the list operation. + /// + /// A page containing the instance IDs and an optional continuation token for the next page. + /// + Task ListInstanceIdsAsync( + string? continuationToken = null, + int? pageSize = null, + CancellationToken cancellation = default); + + /// + /// Gets the full execution history of a workflow instance. + /// + /// The unique ID of the workflow instance to get history for. + /// Token to cancel the retrieval operation. + /// A read-only list of history events for the workflow instance. + /// Thrown if is null or empty. + Task> GetInstanceHistoryAsync( + string instanceId, + CancellationToken cancellation = default); + + /// + /// Reruns a workflow from a specific event ID, creating a new workflow instance that replays + /// the source workflow's history up to the specified event. + /// + /// The instance ID of the source workflow to rerun from. + /// The event ID in the source workflow's history to rerun from. + /// Optional configuration for the rerun operation. + /// Token to cancel the rerun operation. + /// The instance ID of the newly created workflow instance. + /// Thrown if is null or empty. + Task RerunWorkflowFromEventAsync( + string sourceInstanceId, + uint eventId, + RerunWorkflowFromEventOptions? options = null, + CancellationToken cancellation = default); } diff --git a/src/Dapr.Workflow/Logging.cs b/src/Dapr.Workflow/Logging.cs index fa404b59c..a44a2249b 100644 --- a/src/Dapr.Workflow/Logging.cs +++ b/src/Dapr.Workflow/Logging.cs @@ -235,4 +235,13 @@ public static partial void LogWaitForStartException(this ILogger logger, Invalid [LoggerMessage(LogLevel.Debug, "Potential stall: Instance {InstanceId} yielded but scheduled 0 new actions and matched 0 history events")] public static partial void LogWorkflowWorkerOrchestratorStall(this ILogger logger, string instanceId); + + [LoggerMessage(LogLevel.Debug, "Listed workflow instance IDs: {Count} instance(s) returned")] + public static partial void LogListInstanceIds(this ILogger logger, int count); + + [LoggerMessage(LogLevel.Debug, "Retrieved history for workflow '{InstanceId}': {EventCount} event(s)")] + public static partial void LogGetInstanceHistory(this ILogger logger, string instanceId, int eventCount); + + [LoggerMessage(LogLevel.Information, "Rerun workflow from event: source='{SourceInstanceId}', eventId={EventId}, newInstanceId='{NewInstanceId}'")] + public static partial void LogRerunWorkflowFromEvent(this ILogger logger, string sourceInstanceId, uint eventId, string newInstanceId); } diff --git a/test/Dapr.IntegrationTest.Workflow/WorkflowRpcTests.cs b/test/Dapr.IntegrationTest.Workflow/WorkflowRpcTests.cs new file mode 100644 index 000000000..80a5d374f --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/WorkflowRpcTests.cs @@ -0,0 +1,137 @@ +// ------------------------------------------------------------------------ +// Copyright 2026 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Testcontainers.Common; +using Dapr.Testcontainers.Harnesses; +using Dapr.Workflow; +using Dapr.Workflow.Client; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.IntegrationTest.Workflow; + +public sealed class WorkflowRpcTests +{ + [Fact] + public async Task ListInstanceIds_ShouldReturnScheduledWorkflowInstances() + { + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + + await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(needsActorState: true); + await environment.StartAsync(); + + var harness = new DaprHarnessBuilder(componentsDir) + .WithEnvironment(environment) + .BuildWorkflow(); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => opt.RegisterWorkflow(), + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var client = scope.ServiceProvider.GetRequiredService(); + + // Schedule a workflow and wait for completion + var instanceId = Guid.NewGuid().ToString(); + await client.ScheduleNewWorkflowAsync(nameof(SimpleWorkflow), instanceId, "hello"); + await client.WaitForWorkflowCompletionAsync(instanceId); + + // List instance IDs and verify our workflow appears + var page = await client.ListInstanceIdsAsync(); + + Assert.NotNull(page); + Assert.Contains(instanceId, page.InstanceIds); + } + + [Fact] + public async Task GetInstanceHistory_ShouldReturnHistoryForCompletedWorkflow() + { + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + + await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(needsActorState: true); + await environment.StartAsync(); + + var harness = new DaprHarnessBuilder(componentsDir) + .WithEnvironment(environment) + .BuildWorkflow(); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => + { + opt.RegisterWorkflow(); + opt.RegisterActivity(); + }, + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var client = scope.ServiceProvider.GetRequiredService(); + + // Schedule a workflow with an activity and wait for completion + var instanceId = Guid.NewGuid().ToString(); + await client.ScheduleNewWorkflowAsync(nameof(WorkflowWithActivity), instanceId, "test-input"); + var result = await client.WaitForWorkflowCompletionAsync(instanceId); + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + + // Get history and verify it has events + var history = await client.GetInstanceHistoryAsync(instanceId); + + Assert.NotNull(history); + Assert.NotEmpty(history); + // Should contain at least an ExecutionStarted event + Assert.Contains(history, e => e.EventType == WorkflowHistoryEventType.ExecutionStarted); + } + + private sealed class SimpleWorkflow : Workflow + { + public override Task RunAsync(WorkflowContext context, string input) + { + return Task.FromResult($"Processed: {input}"); + } + } + + private sealed class WorkflowWithActivity : Workflow + { + public override async Task RunAsync(WorkflowContext context, string input) + { + return await context.CallActivityAsync(nameof(EchoActivity), input); + } + } + + private sealed class EchoActivity : WorkflowActivity + { + public override Task RunAsync(WorkflowActivityContext context, string input) + { + return Task.FromResult($"Echo: {input}"); + } + } +} diff --git a/test/Dapr.Workflow.Test/Client/ProtoConvertersTests.cs b/test/Dapr.Workflow.Test/Client/ProtoConvertersTests.cs index 16032c334..19f1568d8 100644 --- a/test/Dapr.Workflow.Test/Client/ProtoConvertersTests.cs +++ b/test/Dapr.Workflow.Test/Client/ProtoConvertersTests.cs @@ -163,4 +163,66 @@ public void ToWorkflowMetadata_ShouldMapRuntimeStatus_UsingToRuntimeStatus() Assert.Equal(WorkflowRuntimeStatus.ContinuedAsNew, metadata.RuntimeStatus); } + + [Theory] + [InlineData(HistoryEvent.EventTypeOneofCase.ExecutionStarted, WorkflowHistoryEventType.ExecutionStarted)] + [InlineData(HistoryEvent.EventTypeOneofCase.ExecutionCompleted, WorkflowHistoryEventType.ExecutionCompleted)] + [InlineData(HistoryEvent.EventTypeOneofCase.ExecutionTerminated, WorkflowHistoryEventType.ExecutionTerminated)] + [InlineData(HistoryEvent.EventTypeOneofCase.TaskScheduled, WorkflowHistoryEventType.TaskScheduled)] + [InlineData(HistoryEvent.EventTypeOneofCase.TaskCompleted, WorkflowHistoryEventType.TaskCompleted)] + [InlineData(HistoryEvent.EventTypeOneofCase.TaskFailed, WorkflowHistoryEventType.TaskFailed)] + [InlineData(HistoryEvent.EventTypeOneofCase.TimerCreated, WorkflowHistoryEventType.TimerCreated)] + [InlineData(HistoryEvent.EventTypeOneofCase.TimerFired, WorkflowHistoryEventType.TimerFired)] + [InlineData(HistoryEvent.EventTypeOneofCase.EventRaised, WorkflowHistoryEventType.EventRaised)] + [InlineData(HistoryEvent.EventTypeOneofCase.ExecutionSuspended, WorkflowHistoryEventType.ExecutionSuspended)] + [InlineData(HistoryEvent.EventTypeOneofCase.ExecutionResumed, WorkflowHistoryEventType.ExecutionResumed)] + public void ToHistoryEventType_ShouldMapKnownEventTypes( + HistoryEvent.EventTypeOneofCase protoEventType, + WorkflowHistoryEventType expected) + { + var actual = ProtoConverters.ToHistoryEventType(protoEventType); + + Assert.Equal(expected, actual); + } + + [Fact] + public void ToHistoryEventType_ShouldReturnUnknown_WhenEventTypeIsNone() + { + var actual = ProtoConverters.ToHistoryEventType(HistoryEvent.EventTypeOneofCase.None); + + Assert.Equal(WorkflowHistoryEventType.Unknown, actual); + } + + [Fact] + public void ToWorkflowHistoryEvent_ShouldMapAllFields() + { + var timestamp = Timestamp.FromDateTime(new DateTime(2025, 6, 15, 10, 30, 0, DateTimeKind.Utc)); + + var protoEvent = new HistoryEvent + { + EventId = 42, + Timestamp = timestamp, + ExecutionStarted = new ExecutionStartedEvent { Name = "MyWorkflow" } + }; + + var result = ProtoConverters.ToWorkflowHistoryEvent(protoEvent); + + Assert.Equal(42, result.EventId); + Assert.Equal(WorkflowHistoryEventType.ExecutionStarted, result.EventType); + Assert.Equal(new DateTime(2025, 6, 15, 10, 30, 0, DateTimeKind.Utc), result.Timestamp); + } + + [Fact] + public void ToWorkflowHistoryEvent_ShouldUseMinValue_WhenTimestampIsNull() + { + var protoEvent = new HistoryEvent + { + EventId = 1, + TaskScheduled = new TaskScheduledEvent { Name = "MyActivity" } + }; + + var result = ProtoConverters.ToWorkflowHistoryEvent(protoEvent); + + Assert.Equal(DateTime.MinValue, result.Timestamp); + } } diff --git a/test/Dapr.Workflow.Test/Client/WorkflowGrpcClientTests.cs b/test/Dapr.Workflow.Test/Client/WorkflowGrpcClientTests.cs index ec63997db..c21f7b0ac 100644 --- a/test/Dapr.Workflow.Test/Client/WorkflowGrpcClientTests.cs +++ b/test/Dapr.Workflow.Test/Client/WorkflowGrpcClientTests.cs @@ -358,6 +358,177 @@ public async Task PurgeInstanceAsync_ShouldReturnTrueOnlyWhenDeletedInstanceCoun Assert.Equal(expected, result); } + [Fact] + public async Task ListInstanceIdsAsync_ShouldSendRequestAndReturnPage() + { + var serializer = new StubSerializer(); + ListInstanceIDsRequest? captured = null; + + var grpcClientMock = CreateGrpcClientMock(); + grpcClientMock + .Setup(x => x.ListInstanceIDsAsync(It.IsAny(), It.IsAny())) + .Callback((r, _) => captured = r) + .Returns(CreateAsyncUnaryCall(new ListInstanceIDsResponse + { + InstanceIds = { "id1", "id2", "id3" }, + ContinuationToken = "next-page" + })); + + var client = new WorkflowGrpcClient(grpcClientMock.Object, NullLogger.Instance, serializer); + + var result = await client.ListInstanceIdsAsync(continuationToken: "prev-token", pageSize: 3); + + Assert.NotNull(captured); + Assert.Equal("prev-token", captured!.ContinuationToken); + Assert.Equal(3u, captured.PageSize); + Assert.Equal(3, result.InstanceIds.Count); + Assert.Equal("id1", result.InstanceIds[0]); + Assert.Equal("next-page", result.ContinuationToken); + } + + [Fact] + public async Task ListInstanceIdsAsync_ShouldReturnNullContinuationToken_WhenNoneInResponse() + { + var serializer = new StubSerializer(); + + var grpcClientMock = CreateGrpcClientMock(); + grpcClientMock + .Setup(x => x.ListInstanceIDsAsync(It.IsAny(), It.IsAny())) + .Returns(CreateAsyncUnaryCall(new ListInstanceIDsResponse + { + InstanceIds = { "id1" } + })); + + var client = new WorkflowGrpcClient(grpcClientMock.Object, NullLogger.Instance, serializer); + + var result = await client.ListInstanceIdsAsync(); + + Assert.Single(result.InstanceIds); + Assert.Null(result.ContinuationToken); + } + + [Fact] + public async Task GetInstanceHistoryAsync_ShouldThrowArgumentException_WhenInstanceIdIsNullOrEmpty() + { + var serializer = new StubSerializer(); + var grpcClientMock = CreateGrpcClientMock(); + var client = new WorkflowGrpcClient(grpcClientMock.Object, NullLogger.Instance, serializer); + + await Assert.ThrowsAsync(() => client.GetInstanceHistoryAsync("")); + } + + [Fact] + public async Task GetInstanceHistoryAsync_ShouldReturnConvertedHistoryEvents() + { + var serializer = new StubSerializer(); + GetInstanceHistoryRequest? captured = null; + + var timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime( + new DateTime(2025, 6, 15, 10, 30, 0, DateTimeKind.Utc)); + + var grpcClientMock = CreateGrpcClientMock(); + grpcClientMock + .Setup(x => x.GetInstanceHistoryAsync(It.IsAny(), It.IsAny())) + .Callback((r, _) => captured = r) + .Returns(CreateAsyncUnaryCall(new GetInstanceHistoryResponse + { + Events = + { + new HistoryEvent + { + EventId = 1, + Timestamp = timestamp, + ExecutionStarted = new ExecutionStartedEvent { Name = "MyWorkflow" } + }, + new HistoryEvent + { + EventId = 2, + Timestamp = timestamp, + TaskScheduled = new TaskScheduledEvent { Name = "MyActivity" } + } + } + })); + + var client = new WorkflowGrpcClient(grpcClientMock.Object, NullLogger.Instance, serializer); + + var result = await client.GetInstanceHistoryAsync("i"); + + Assert.NotNull(captured); + Assert.Equal("i", captured!.InstanceId); + Assert.Equal(2, result.Count); + Assert.Equal(1, result[0].EventId); + Assert.Equal(WorkflowHistoryEventType.ExecutionStarted, result[0].EventType); + Assert.Equal(2, result[1].EventId); + Assert.Equal(WorkflowHistoryEventType.TaskScheduled, result[1].EventType); + } + + [Fact] + public async Task RerunWorkflowFromEventAsync_ShouldThrowArgumentException_WhenSourceInstanceIdIsNullOrEmpty() + { + var serializer = new StubSerializer(); + var grpcClientMock = CreateGrpcClientMock(); + var client = new WorkflowGrpcClient(grpcClientMock.Object, NullLogger.Instance, serializer); + + await Assert.ThrowsAsync(() => client.RerunWorkflowFromEventAsync("", 1)); + } + + [Fact] + public async Task RerunWorkflowFromEventAsync_ShouldSendRequestAndReturnNewInstanceId() + { + var serializer = new StubSerializer { SerializeResult = "{\"key\":\"value\"}" }; + RerunWorkflowFromEventRequest? captured = null; + + var grpcClientMock = CreateGrpcClientMock(); + grpcClientMock + .Setup(x => x.RerunWorkflowFromEventAsync(It.IsAny(), It.IsAny())) + .Callback((r, _) => captured = r) + .Returns(CreateAsyncUnaryCall(new RerunWorkflowFromEventResponse { NewInstanceID = "new-instance" })); + + var client = new WorkflowGrpcClient(grpcClientMock.Object, NullLogger.Instance, serializer); + + var options = new RerunWorkflowFromEventOptions + { + NewInstanceId = "custom-id", + Input = new { Key = "value" }, + OverwriteInput = true + }; + + var result = await client.RerunWorkflowFromEventAsync("source-id", 42, options); + + Assert.Equal("new-instance", result); + Assert.NotNull(captured); + Assert.Equal("source-id", captured!.SourceInstanceID); + Assert.Equal(42u, captured.EventID); + Assert.Equal("custom-id", captured.NewInstanceID); + Assert.True(captured.OverwriteInput); + Assert.Equal("{\"key\":\"value\"}", captured.Input); + } + + [Fact] + public async Task RerunWorkflowFromEventAsync_ShouldNotSetOptionalFields_WhenOptionsIsNull() + { + var serializer = new StubSerializer(); + RerunWorkflowFromEventRequest? captured = null; + + var grpcClientMock = CreateGrpcClientMock(); + grpcClientMock + .Setup(x => x.RerunWorkflowFromEventAsync(It.IsAny(), It.IsAny())) + .Callback((r, _) => captured = r) + .Returns(CreateAsyncUnaryCall(new RerunWorkflowFromEventResponse { NewInstanceID = "auto-id" })); + + var client = new WorkflowGrpcClient(grpcClientMock.Object, NullLogger.Instance, serializer); + + var result = await client.RerunWorkflowFromEventAsync("source-id", 5); + + Assert.Equal("auto-id", result); + Assert.NotNull(captured); + Assert.Equal("source-id", captured!.SourceInstanceID); + Assert.Equal(5u, captured.EventID); + Assert.False(captured.HasNewInstanceID); + Assert.False(captured.OverwriteInput); + Assert.Null(captured.Input); + } + [Fact] public async Task DisposeAsync_ShouldCompleteSynchronously() { diff --git a/test/Dapr.Workflow.Test/DaprWorkflowClientTests.cs b/test/Dapr.Workflow.Test/DaprWorkflowClientTests.cs index b424923bf..0498d3756 100644 --- a/test/Dapr.Workflow.Test/DaprWorkflowClientTests.cs +++ b/test/Dapr.Workflow.Test/DaprWorkflowClientTests.cs @@ -11,6 +11,7 @@ // limitations under the License. // ------------------------------------------------------------------------ +using System.Collections.ObjectModel; using Dapr.Workflow.Client; namespace Dapr.Workflow.Test; @@ -207,6 +208,71 @@ public async Task DisposeAsync_ShouldForwardToInner() Assert.True(inner.DisposeCalled); } + [Fact] + public async Task ListInstanceIdsAsync_ShouldForwardToInnerClient() + { + var expectedPage = new WorkflowInstancePage( + new ReadOnlyCollection(new[] { "id1", "id2" }), + "next-token"); + var inner = new CapturingWorkflowClient { ListInstanceIdsResult = expectedPage }; + var client = new DaprWorkflowClient(inner); + + var result = await client.ListInstanceIdsAsync(continuationToken: "token", pageSize: 10); + + Assert.Equal(expectedPage, result); + Assert.Equal("token", inner.LastListContinuationToken); + Assert.Equal(10, inner.LastListPageSize); + } + + [Fact] + public async Task GetInstanceHistoryAsync_ShouldThrowArgumentException_WhenInstanceIdIsNullOrEmpty() + { + var inner = new CapturingWorkflowClient(); + var client = new DaprWorkflowClient(inner); + + await Assert.ThrowsAsync(() => client.GetInstanceHistoryAsync("")); + } + + [Fact] + public async Task GetInstanceHistoryAsync_ShouldForwardToInnerClient() + { + var events = new ReadOnlyCollection(new[] + { + new WorkflowHistoryEvent(1, WorkflowHistoryEventType.ExecutionStarted, DateTime.MinValue) + }); + var inner = new CapturingWorkflowClient { GetInstanceHistoryResult = events }; + var client = new DaprWorkflowClient(inner); + + var result = await client.GetInstanceHistoryAsync("i"); + + Assert.Single(result); + Assert.Equal("i", inner.LastGetHistoryInstanceId); + } + + [Fact] + public async Task RerunWorkflowFromEventAsync_ShouldThrowArgumentException_WhenSourceInstanceIdIsNullOrEmpty() + { + var inner = new CapturingWorkflowClient(); + var client = new DaprWorkflowClient(inner); + + await Assert.ThrowsAsync(() => client.RerunWorkflowFromEventAsync("", 1)); + } + + [Fact] + public async Task RerunWorkflowFromEventAsync_ShouldForwardToInnerClient() + { + var inner = new CapturingWorkflowClient { RerunWorkflowFromEventResult = "new-id" }; + var client = new DaprWorkflowClient(inner); + + var options = new RerunWorkflowFromEventOptions { NewInstanceId = "custom-id" }; + var result = await client.RerunWorkflowFromEventAsync("source-id", 42, options); + + Assert.Equal("new-id", result); + Assert.Equal("source-id", inner.LastRerunSourceInstanceId); + Assert.Equal(42u, inner.LastRerunEventId); + Assert.Equal(options, inner.LastRerunOptions); + } + private sealed class CapturingWorkflowClient : WorkflowClient { public string? LastScheduleName { get; private set; } @@ -243,6 +309,20 @@ private sealed class CapturingWorkflowClient : WorkflowClient public string? LastPurgeInstanceId { get; private set; } public bool PurgeResult { get; set; } + public string? LastListContinuationToken { get; private set; } + public int? LastListPageSize { get; private set; } + public WorkflowInstancePage ListInstanceIdsResult { get; set; } = + new(new ReadOnlyCollection(Array.Empty()), null); + + public string? LastGetHistoryInstanceId { get; private set; } + public IReadOnlyList GetInstanceHistoryResult { get; set; } = + new ReadOnlyCollection(Array.Empty()); + + public string? LastRerunSourceInstanceId { get; private set; } + public uint LastRerunEventId { get; private set; } + public RerunWorkflowFromEventOptions? LastRerunOptions { get; private set; } + public string RerunWorkflowFromEventResult { get; set; } = "new-id"; + public bool DisposeCalled { get; private set; } public override Task ScheduleNewWorkflowAsync( @@ -332,6 +412,36 @@ public override Task PurgeInstanceAsync(string instanceId, CancellationTok return Task.FromResult(PurgeResult); } + public override Task ListInstanceIdsAsync( + string? continuationToken = null, + int? pageSize = null, + CancellationToken cancellationToken = default) + { + LastListContinuationToken = continuationToken; + LastListPageSize = pageSize; + return Task.FromResult(ListInstanceIdsResult); + } + + public override Task> GetInstanceHistoryAsync( + string instanceId, + CancellationToken cancellationToken = default) + { + LastGetHistoryInstanceId = instanceId; + return Task.FromResult(GetInstanceHistoryResult); + } + + public override Task RerunWorkflowFromEventAsync( + string sourceInstanceId, + uint eventId, + RerunWorkflowFromEventOptions? options = null, + CancellationToken cancellationToken = default) + { + LastRerunSourceInstanceId = sourceInstanceId; + LastRerunEventId = eventId; + LastRerunOptions = options; + return Task.FromResult(RerunWorkflowFromEventResult); + } + public override ValueTask DisposeAsync() { DisposeCalled = true;