diff --git a/src/Dapr.Testcontainers/Containers/Dapr/DaprSchedulerContainer.cs b/src/Dapr.Testcontainers/Containers/Dapr/DaprSchedulerContainer.cs index de1532182..3e6b09695 100644 --- a/src/Dapr.Testcontainers/Containers/Dapr/DaprSchedulerContainer.cs +++ b/src/Dapr.Testcontainers/Containers/Dapr/DaprSchedulerContainer.cs @@ -94,9 +94,6 @@ public ValueTask DisposeAsync() { // Remove the data directory if it exists TestDirectoryManager.CleanUpDirectory(_testDirectory); - - // if (Directory.Exists(_hostDataDir)) - // Directory.Delete(_hostDataDir, true); return _container.DisposeAsync(); } } diff --git a/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs b/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs index c7d4a08a4..c491d14bf 100644 --- a/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs +++ b/src/Dapr.Workflow/Worker/Grpc/GrpcProtocolHandler.cs @@ -45,8 +45,8 @@ internal sealed class GrpcProtocolHandler(TaskHubSidecarService.TaskHubSidecarSe /// Handler for activity work items. /// Cancellation token. public async Task StartAsync( - Func> workflowHandler, - Func> activityHandler, + Func> workflowHandler, + Func> activityHandler, CancellationToken cancellationToken) { using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposalCts.Token); @@ -126,8 +126,8 @@ private static async Task DelayOrStopAsync(TimeSpan delay, CancellationToken tok /// private async Task ReceiveLoopAsync( IAsyncStreamReader workItemsStream, - Func> orchestratorHandler, - Func> activityHandler, + Func> orchestratorHandler, + Func> activityHandler, CancellationToken cancellationToken) { // Track active work items for proper exception handling @@ -137,14 +137,16 @@ private async Task ReceiveLoopAsync( { await foreach (var workItem in workItemsStream.ReadAllAsync(cancellationToken)) { + var completionToken = workItem.CompletionToken; + // Dispatch based on work item type var workItemTask = workItem.RequestCase switch { WorkItem.RequestOneofCase.OrchestratorRequest => Task.Run( - () => ProcessWorkflowAsync(workItem.OrchestratorRequest, orchestratorHandler, cancellationToken), + () => ProcessWorkflowAsync(workItem.OrchestratorRequest, completionToken, orchestratorHandler, cancellationToken), cancellationToken), WorkItem.RequestOneofCase.ActivityRequest => Task.Run( - () => ProcessActivityAsync(workItem.ActivityRequest, activityHandler, cancellationToken), + () => ProcessActivityAsync(workItem.ActivityRequest, completionToken, activityHandler, cancellationToken), cancellationToken), _ => Task.Run( () => _logger.LogGrpcProtocolHandlerUnknownWorkItemType(workItem.RequestCase), @@ -188,8 +190,8 @@ private async Task ReceiveLoopAsync( /// /// Processes a workflow request work item. /// - private async Task ProcessWorkflowAsync(OrchestratorRequest request, - Func> handler, CancellationToken cancellationToken) + private async Task ProcessWorkflowAsync(OrchestratorRequest request, string completionToken, + Func> handler, CancellationToken cancellationToken) { var activeCount = Interlocked.Increment(ref _activeWorkItemCount); @@ -197,7 +199,7 @@ private async Task ProcessWorkflowAsync(OrchestratorRequest request, { _logger.LogGrpcProtocolHandlerWorkflowProcessorStart(request.InstanceId, activeCount); - var result = await handler(request); + var result = await handler(request, completionToken); // Send the result back to Dapr await _grpcClient.CompleteOrchestratorTaskAsync(result, cancellationToken: cancellationToken); @@ -227,8 +229,8 @@ private async Task ProcessWorkflowAsync(OrchestratorRequest request, /// /// Processes an activity request work item. /// - private async Task ProcessActivityAsync(ActivityRequest request, - Func> handler, CancellationToken cancellationToken) + private async Task ProcessActivityAsync(ActivityRequest request, string completionToken, + Func> handler, CancellationToken cancellationToken) { var activeCount = Interlocked.Increment(ref _activeWorkItemCount); @@ -236,7 +238,7 @@ private async Task ProcessActivityAsync(ActivityRequest request, { _logger.LogGrpcProtocolHandlerActivityProcessorStart(request.OrchestrationInstance.InstanceId, request.Name, request.TaskId, activeCount); - var result = await handler(request); + var result = await handler(request, completionToken); // Send the result back to Dapr await _grpcClient.CompleteActivityTaskAsync(result, cancellationToken: cancellationToken); @@ -252,7 +254,7 @@ private async Task ProcessActivityAsync(ActivityRequest request, try { - var failureResult = CreateActivityFailureResult(request, ex); + var failureResult = CreateActivityFailureResult(request, completionToken, ex); await _grpcClient.CompleteActivityTaskAsync(failureResult, cancellationToken: cancellationToken); } catch (Exception resultEx) @@ -269,11 +271,12 @@ private async Task ProcessActivityAsync(ActivityRequest request, /// /// Creates a failure response for an activity exception. /// - private static ActivityResponse CreateActivityFailureResult(ActivityRequest request, Exception ex) => + private static ActivityResponse CreateActivityFailureResult(ActivityRequest request, string completionToken, Exception ex) => new() { - InstanceId = request.OrchestrationInstance.InstanceId, + TaskId = request.TaskId, + CompletionToken = completionToken, FailureDetails = new() { ErrorType = ex.GetType().FullName ?? "Exception", diff --git a/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs b/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs index 8c1298e07..a3a197079 100644 --- a/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs +++ b/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs @@ -37,6 +37,8 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext private readonly List _externalEventBuffer = []; private readonly Dictionary>> _externalEventSources = new(StringComparer.OrdinalIgnoreCase); private readonly Dictionary> _openTasks = []; + private readonly Dictionary _taskIdToExecutionId = []; + private readonly Dictionary _executionIdToTaskId = new(StringComparer.Ordinal); private readonly SortedDictionary _pendingActions = []; private readonly IWorkflowSerializer _workflowSerializer; private readonly ILogger _logger; @@ -50,6 +52,7 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext /// Key is taskedScheduledId/timerId/etc. as provided by the history event. /// private readonly Dictionary _unmatchedCompletions = []; + private readonly Dictionary _unmatchedCompletionsByExecutionId = new(StringComparer.Ordinal); // Parse instance ID as GUID or generate one @@ -105,11 +108,13 @@ public override async Task CallActivityAsync(string name, object? input = { ArgumentException.ThrowIfNullOrWhiteSpace(name); var taskId = _sequenceNumber++; + var taskExecutionId = CreateTaskExecutionId(taskId, name); var router = CreateRouter(options?.TargetAppId); // If the completion arrived before we registered the task, consume it now - if (_unmatchedCompletions.Remove(taskId, out var earlyCompletion)) + if (_unmatchedCompletionsByExecutionId.Remove(taskExecutionId, out var earlyCompletion) || + _unmatchedCompletions.Remove(taskId, out earlyCompletion)) { _logger.LogDebug("Found early completion in buffer for task {TaskId} ({ActivityName})", taskId, name); return await HandleHistoryMatch(name, earlyCompletion, taskId); @@ -122,13 +127,16 @@ public override async Task CallActivityAsync(string name, object? input = { Name = name, Input = _workflowSerializer.Serialize(input), - Router = router + Router = router, + TaskExecutionId = taskExecutionId }, Router = router }); var tcs = new TaskCompletionSource(); _openTasks.Add(taskId, tcs); + _taskIdToExecutionId[taskId] = taskExecutionId; + _executionIdToTaskId[taskExecutionId] = taskId; var historyEvent = await tcs.Task; return await HandleHistoryMatch(name, historyEvent, taskId); @@ -440,6 +448,17 @@ private void HandleActionCompleted(HistoryEvent historyEvent, int taskId) { tcs.SetResult(historyEvent); _openTasks.Remove(taskId); + RemoveTaskExecutionMapping(taskId); + return; + } + + if (TryGetTaskExecutionId(historyEvent, out var taskExecutionId) && + _executionIdToTaskId.TryGetValue(taskExecutionId, out var executionTaskId) && + _openTasks.TryGetValue(executionTaskId, out var executionTcs)) + { + executionTcs.SetResult(historyEvent); + _openTasks.Remove(executionTaskId); + RemoveTaskExecutionMapping(executionTaskId); return; } @@ -454,6 +473,21 @@ private void HandleActionCompleted(HistoryEvent historyEvent, int taskId) // Buffer the completion so the next replay pass can consume it when the workflow schedules/await the task. // Ignore duplicates (first completion wins) + if (TryGetTaskExecutionId(historyEvent, out var unmatchedExecutionId)) + { + if (!_unmatchedCompletionsByExecutionId.TryAdd(unmatchedExecutionId, historyEvent)) + { + _logger.LogWarning( + "Received completion for unknown taskId {TaskId} in instance {InstanceId}. OpenTasks=[{OpenTasks}] EventType={EventType}", + taskId, + InstanceId, + string.Join(",", _openTasks.Keys), + historyEvent.EventTypeCase); + } + + return; + } + if (!_unmatchedCompletions.TryAdd(taskId, historyEvent)) { // If we get here, the runtime delivered a completion for an unknown task id. @@ -519,6 +553,33 @@ private Task HandleFailedActivityFromHistory(string activityName, TaskFail throw CreateTaskFailedException(failed); } + private string CreateTaskExecutionId(int taskId, string name) + { + var seed = $"{InstanceId}|activity|{taskId}|{name}"; + return CreateGuidFromName(_instanceGuid, Encoding.UTF8.GetBytes(seed)).ToString("N"); + } + + private static bool TryGetTaskExecutionId(HistoryEvent historyEvent, out string taskExecutionId) + { + taskExecutionId = historyEvent switch + { + { TaskCompleted: { } completed } => completed.TaskExecutionId, + { TaskFailed: { } failed } => failed.TaskExecutionId, + _ => string.Empty + }; + + return !string.IsNullOrWhiteSpace(taskExecutionId); + } + + private void RemoveTaskExecutionMapping(int taskId) + { + if (_taskIdToExecutionId.TryGetValue(taskId, out var executionId)) + { + _taskIdToExecutionId.Remove(taskId); + _executionIdToTaskId.Remove(executionId); + } + } + /// /// Handles a child workflow that completed in the workflow history. /// diff --git a/src/Dapr.Workflow/Worker/WorkflowWorker.cs b/src/Dapr.Workflow/Worker/WorkflowWorker.cs index c3ec7af7c..8df73c53d 100644 --- a/src/Dapr.Workflow/Worker/WorkflowWorker.cs +++ b/src/Dapr.Workflow/Worker/WorkflowWorker.cs @@ -29,12 +29,18 @@ namespace Dapr.Workflow.Worker; /// /// Background service that processes workflow and activity work items from the Dapr sidecar. /// -internal sealed class WorkflowWorker(TaskHubSidecarService.TaskHubSidecarServiceClient grpcClient, IWorkflowsFactory workflowsFactory, ILoggerFactory loggerFactory, IWorkflowSerializer workflowSerializer, IServiceProvider serviceProvider, WorkflowRuntimeOptions options) : BackgroundService +internal sealed class WorkflowWorker( + TaskHubSidecarService.TaskHubSidecarServiceClient grpcClient, + IWorkflowsFactory workflowsFactory, + ILoggerFactory loggerFactory, + IWorkflowSerializer workflowSerializer, + IServiceProvider serviceProvider, + WorkflowRuntimeOptions options) : BackgroundService { private readonly TaskHubSidecarService.TaskHubSidecarServiceClient _grpcClient = grpcClient ?? throw new ArgumentNullException(nameof(grpcClient)); private readonly IWorkflowsFactory _workflowsFactory = workflowsFactory ?? throw new ArgumentNullException(nameof(workflowsFactory)); private readonly IServiceProvider _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); - private readonly ILogger _logger = loggerFactory?.CreateLogger() ?? throw new ArgumentNullException(nameof(loggerFactory)); + private readonly ILogger _logger = loggerFactory.CreateLogger() ?? throw new ArgumentNullException(nameof(loggerFactory)); private readonly WorkflowRuntimeOptions _options = options ?? throw new ArgumentNullException(nameof(options)); private readonly IWorkflowSerializer _serializer = workflowSerializer ?? throw new ArgumentNullException(nameof(workflowSerializer)); @@ -66,7 +72,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) } } - private async Task HandleOrchestratorResponseAsync(OrchestratorRequest request) + private async Task HandleOrchestratorResponseAsync(OrchestratorRequest request, string completionToken) { _logger.LogWorkerWorkflowHandleOrchestratorRequestStart(request.InstanceId); @@ -162,7 +168,11 @@ private async Task HandleOrchestratorResponseAsync(Orchest context.ProcessEvents(request.NewEvents, false); // Get all pending actions from the context - var response = new OrchestratorResponse { InstanceId = request.InstanceId }; + var response = new OrchestratorResponse + { + InstanceId = request.InstanceId, + CompletionToken = completionToken + }; // Add all actions that were scheduled during workflow execution response.Actions.AddRange(context.PendingActions); @@ -215,6 +225,7 @@ private async Task HandleOrchestratorResponseAsync(Orchest OrchestrationStatus = OrchestrationStatus.Failed, FailureDetails = new() { + IsNonRetriable = true, ErrorType = ex.GetType().FullName ?? "Exception", ErrorMessage = ex.Message, StackTrace = ex.StackTrace ?? string.Empty @@ -253,7 +264,7 @@ private async Task HandleOrchestratorResponseAsync(Orchest } } - private async Task HandleActivityResponseAsync(ActivityRequest request) + private async Task HandleActivityResponseAsync(ActivityRequest request, string completionToken) { _logger.LogWorkerWorkflowHandleActivityRequestStart(request.Name, request.OrchestrationInstance?.InstanceId, request.TaskId); @@ -272,6 +283,7 @@ private async Task HandleActivityResponseAsync(ActivityRequest { InstanceId = request.OrchestrationInstance?.InstanceId ?? string.Empty, TaskId = request.TaskId, + CompletionToken = completionToken, FailureDetails = new() { ErrorType = "ActivityNotFoundException", @@ -310,7 +322,8 @@ private async Task HandleActivityResponseAsync(ActivityRequest { InstanceId = request.OrchestrationInstance?.InstanceId ?? string.Empty, TaskId = request.TaskId, - Result = outputJson + Result = outputJson, + CompletionToken = completionToken }; } catch (Exception ex) @@ -321,6 +334,7 @@ private async Task HandleActivityResponseAsync(ActivityRequest { InstanceId = request.OrchestrationInstance?.InstanceId ?? string.Empty, TaskId = request.TaskId, + CompletionToken = completionToken, FailureDetails = new() { ErrorType = ex.GetType().FullName ?? "Exception", diff --git a/test/Dapr.IntegrationTest.Workflow/ActivityCompletionAcknowledgementTests.cs b/test/Dapr.IntegrationTest.Workflow/ActivityCompletionAcknowledgementTests.cs new file mode 100644 index 000000000..3f25446bb --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/ActivityCompletionAcknowledgementTests.cs @@ -0,0 +1,92 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Collections.Concurrent; +using Dapr.Testcontainers.Common; +using Dapr.Testcontainers.Common.Options; +using Dapr.Testcontainers.Harnesses; +using Dapr.Workflow; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.IntegrationTest.Workflow; + +public sealed class ActivityCompletionAcknowledgementTests +{ + [Fact] + public async Task ActivityCompletion_ShouldNotBeRetried_WhenAcknowledged() + { + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(needsActorState: true); + await environment.StartAsync(); + + var harness = new DaprHarnessBuilder(options, environment).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + configureRuntime: opt => + { + opt.RegisterWorkflow(); + opt.RegisterActivity(); + }, + configureClient: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + CountingActivity.Reset(workflowInstanceId); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(SingleActivityWorkflow), workflowInstanceId, "start"); + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId, true); + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var executionCount = result.ReadOutputAs(); + + Assert.Equal(1, executionCount); + Assert.Equal(1, CountingActivity.GetCount(workflowInstanceId)); + } + + private sealed class CountingActivity : WorkflowActivity + { + private static readonly ConcurrentDictionary Counts = new(StringComparer.Ordinal); + + public override Task RunAsync(WorkflowActivityContext context, string input) + { + var count = Counts.AddOrUpdate(context.InstanceId, _ => 1, (_, current) => current + 1); + return Task.FromResult(count); + } + + public static void Reset(string instanceId) => Counts.TryRemove(instanceId, out _); + + public static int GetCount(string instanceId) => + Counts.TryGetValue(instanceId, out var count) ? count : 0; + } + + private sealed class SingleActivityWorkflow : Workflow + { + public override Task RunAsync(WorkflowContext context, string input) => + context.CallActivityAsync(nameof(CountingActivity), input); + } +} diff --git a/test/Dapr.IntegrationTest.Workflow/ActivityCompletionLoadTests.cs b/test/Dapr.IntegrationTest.Workflow/ActivityCompletionLoadTests.cs new file mode 100644 index 000000000..708a4a485 --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/ActivityCompletionLoadTests.cs @@ -0,0 +1,155 @@ +// ------------------------------------------------------------------------ +// Copyright 2025 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Collections.Concurrent; +using Dapr.Testcontainers.Common; +using Dapr.Testcontainers.Common.Options; +using Dapr.Testcontainers.Harnesses; +using Dapr.Workflow; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace Dapr.IntegrationTest.Workflow; + +public sealed class ActivityCompletionLoadTests +{ + [Fact] + public async Task ActivityCompletions_ShouldBeAcknowledged_UnderLoad() + { + const int activityCount = 50; + var options = new DaprRuntimeOptions(); + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + var loggerProvider = new InMemoryLoggerProvider(); + + await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(needsActorState: true); + await environment.StartAsync(); + + var harness = new DaprHarnessBuilder(options, environment).BuildWorkflow(componentsDir); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + configureRuntime: opt => + { + opt.RegisterWorkflow(); + opt.RegisterActivity(); + }, + configureClient: (sp, clientBuilder) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + clientBuilder.UseGrpcEndpoint(grpcEndpoint); + }); + builder.Logging.AddProvider(loggerProvider); + }) + .BuildAndStartAsync(); + + CountingActivity.Reset(workflowInstanceId); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(FanOutWorkflow), workflowInstanceId, activityCount); + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(workflowInstanceId, true); + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + var completedCount = result.ReadOutputAs(); + Assert.Equal(activityCount, completedCount); + + for (var i = 0; i < activityCount; i++) + { + Assert.True(CountingActivity.GetCount(workflowInstanceId, i) >= 1); + } + + var unexpectedWarnings = loggerProvider.Entries + .Where(entry => entry.Level >= LogLevel.Warning) + .Where(entry => entry.Message.Contains("Received completion for unknown taskId", StringComparison.Ordinal)) + .ToList(); + + Assert.Empty(unexpectedWarnings); + } + + private sealed class CountingActivity : WorkflowActivity + { + private static readonly ConcurrentDictionary Counts = new(StringComparer.Ordinal); + + public override Task RunAsync(WorkflowActivityContext context, int input) + { + var key = BuildKey(context.InstanceId, input); + var count = Counts.AddOrUpdate(key, _ => 1, (_, current) => current + 1); + return Task.FromResult(count); + } + + public static void Reset(string instanceId) + { + foreach (var key in Counts.Keys.Where(k => k.StartsWith(instanceId + ":", StringComparison.Ordinal))) + { + Counts.TryRemove(key, out _); + } + } + + public static int GetCount(string instanceId, int input) => Counts.GetValueOrDefault(BuildKey(instanceId, input), 0); + + private static string BuildKey(string instanceId, int input) => $"{instanceId}:{input}"; + } + + private sealed class FanOutWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, int input) + { + var tasks = new Task[input]; + for (var i = 0; i < input; i++) + { + tasks[i] = context.CallActivityAsync(nameof(CountingActivity), i); + } + + var results = await Task.WhenAll(tasks); + return results.Length; + } + } + + private sealed class InMemoryLoggerProvider : ILoggerProvider + { + private readonly ConcurrentQueue _entries = new(); + + public IEnumerable Entries => _entries; + + public ILogger CreateLogger(string categoryName) => new InMemoryLogger(categoryName, _entries); + + public void Dispose() + { + } + + internal sealed record LogEntry(LogLevel Level, string Category, string Message, Exception? Exception); + + private sealed class InMemoryLogger(string categoryName, ConcurrentQueue entries) : ILogger + { + public IDisposable? BeginScope(TState state) where TState : notnull => null; + + public bool IsEnabled(LogLevel logLevel) => true; + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception? exception, + Func formatter) + { + if (formatter is null) + return; + + var message = formatter(state, exception); + entries.Enqueue(new LogEntry(logLevel, categoryName, message, exception)); + } + } + } +} diff --git a/test/Dapr.IntegrationTest.Workflow/TaskExecutionKeyTests.cs b/test/Dapr.IntegrationTest.Workflow/TaskExecutionKeyTests.cs index 7da4deefa..17658e4e8 100644 --- a/test/Dapr.IntegrationTest.Workflow/TaskExecutionKeyTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/TaskExecutionKeyTests.cs @@ -68,7 +68,7 @@ public async Task ActivityContext_ShouldContainTaskExecutionKey() // Assert that the TaskExecutionKey is present Assert.False(string.IsNullOrWhiteSpace(taskExecutionKey), "TaskExecutionKey should not be null or empty."); } - + private sealed class GetTaskExecutionKeyActivity : WorkflowActivity { public override Task RunAsync(WorkflowActivityContext context, string input) diff --git a/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs b/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs index b40eb3eb9..798b53bcc 100644 --- a/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs +++ b/test/Dapr.Workflow.Test/Worker/Grpc/GrpcProtocolHandlerTests.cs @@ -29,8 +29,8 @@ private static TaskCompletionSource CreateTcs() => private static async Task RunHandlerUntilAsync( GrpcProtocolHandler handler, - Func> workflowHandler, - Func> activityHandler, + Func> workflowHandler, + Func> activityHandler, Task until, TimeSpan timeout) { @@ -50,8 +50,8 @@ private static async Task RunHandlerUntilAsync( private static async Task RunHandlerUntilAsync( GrpcProtocolHandler handler, - Func> workflowHandler, - Func> activityHandler, + Func> workflowHandler, + Func> activityHandler, Func untilCondition, TimeSpan timeout, TimeSpan? pollInterval = null) @@ -142,8 +142,8 @@ public async Task StartAsync_ShouldCompleteOrchestratorTask_ForOrchestratorWorkI await RunHandlerUntilAsync( handler, - workflowHandler: req => Task.FromResult(new OrchestratorResponse { InstanceId = req.InstanceId }), - activityHandler: _ => Task.FromResult(new ActivityResponse()), + workflowHandler: (req, _) => Task.FromResult(new OrchestratorResponse { InstanceId = req.InstanceId }), + activityHandler: (_,_) => Task.FromResult(new ActivityResponse()), until: completedTcs.Task, timeout: TimeSpan.FromSeconds(2)); @@ -155,6 +155,7 @@ await RunHandlerUntilAsync( public async Task StartAsync_ShouldCompleteActivityTask_ForActivityWorkItem() { var grpcClientMock = CreateGrpcClientMock(); + const string completionToken = "abc"; var workItems = new[] { @@ -165,7 +166,8 @@ public async Task StartAsync_ShouldCompleteActivityTask_ForActivityWorkItem() Name = "act", TaskId = 42, OrchestrationInstance = new OrchestrationInstance { InstanceId = "i-2" } - } + }, + CompletionToken = completionToken } }; @@ -184,12 +186,13 @@ public async Task StartAsync_ShouldCompleteActivityTask_ForActivityWorkItem() await RunHandlerUntilAsync( handler, - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: req => Task.FromResult(new ActivityResponse + workflowHandler: (_,_) => Task.FromResult(new OrchestratorResponse()), + activityHandler: (req, tok) => Task.FromResult(new ActivityResponse { InstanceId = req.OrchestrationInstance.InstanceId, TaskId = req.TaskId, - Result = "ok" + Result = "ok", + CompletionToken = tok }), until: completedTcs.Task, timeout: TimeSpan.FromSeconds(2)); @@ -198,6 +201,7 @@ await RunHandlerUntilAsync( Assert.Equal("i-2", completed.InstanceId); Assert.Equal(42, completed.TaskId); Assert.Equal("ok", completed.Result); + Assert.Equal(completionToken, completed.CompletionToken); } [Fact] @@ -228,8 +232,8 @@ public async Task StartAsync_ShouldSendFailureResult_WhenOrchestratorHandlerThro await RunHandlerUntilAsync( handler, - workflowHandler: _ => throw new InvalidOperationException("boom"), - activityHandler: _ => Task.FromResult(new ActivityResponse()), + workflowHandler: (_,_) => throw new InvalidOperationException("boom"), + activityHandler: (_,_) => Task.FromResult(new ActivityResponse()), until: completedTcs.Task, timeout: TimeSpan.FromSeconds(2)); @@ -276,8 +280,8 @@ public async Task StartAsync_ShouldSendGetWorkItemsRequest_WithConfiguredConcurr await RunHandlerUntilAsync( handler, - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: _ => Task.FromResult(new ActivityResponse()), + workflowHandler: (_,_) => Task.FromResult(new OrchestratorResponse()), + activityHandler: (_,_) => Task.FromResult(new ActivityResponse()), untilCondition: () => captured is not null, timeout: TimeSpan.FromSeconds(2)); @@ -303,8 +307,8 @@ public async Task StartAsync_ShouldReturnWithoutThrowing_WhenGrpcStreamIsCancell cts.Cancel(); await handler.StartAsync( - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: _ => Task.FromResult(new ActivityResponse()), + workflowHandler: (_,_) => Task.FromResult(new OrchestratorResponse()), + activityHandler: (_,_) => Task.FromResult(new ActivityResponse()), cancellationToken: cts.Token); // Since we were already canceled, we shouldn't even attempt to connect. @@ -346,15 +350,15 @@ public async Task StartAsync_ShouldSendActivityFailureResult_WhenActivityHandler await RunHandlerUntilAsync( handler, - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: _ => throw new InvalidOperationException("boom"), + workflowHandler: (_,_) => Task.FromResult(new OrchestratorResponse()), + activityHandler: (_,_) => throw new InvalidOperationException("boom"), until: sentTcs.Task, timeout: TimeSpan.FromSeconds(2)); var sent = await sentTcs.Task; Assert.Equal("i-1", sent.InstanceId); - Assert.Equal(0, sent.TaskId); + Assert.True(sent.TaskId > 0); Assert.NotNull(sent.FailureDetails); Assert.Contains(nameof(InvalidOperationException), sent.FailureDetails.ErrorType); Assert.Contains("boom", sent.FailureDetails.ErrorMessage); @@ -393,8 +397,8 @@ public async Task StartAsync_ShouldNotThrow_WhenSendingActivityFailureResultAlso await RunHandlerUntilAsync( handler, - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: _ => throw new Exception("boom"), + workflowHandler: (_,_) => Task.FromResult(new OrchestratorResponse()), + activityHandler: (_,_) => throw new Exception("boom"), untilCondition: () => completeAttempted, timeout: TimeSpan.FromSeconds(2)); @@ -434,8 +438,8 @@ public async Task StartAsync_ShouldUseCreateActivityFailureResult_WithNullStackT await RunHandlerUntilAsync( handler, - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: _ => throw new NullStackTraceException("boom"), + workflowHandler: (_,_) => Task.FromResult(new OrchestratorResponse()), + activityHandler: (_,_) => throw new NullStackTraceException("boom"), until: sentTcs.Task, timeout: TimeSpan.FromSeconds(2)); @@ -478,8 +482,8 @@ public async Task StartAsync_ShouldNotCallCompleteActivityTask_WhenActivityHandl using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); await handler.StartAsync( - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: _ => + workflowHandler: (_,_) => Task.FromResult(new OrchestratorResponse()), + activityHandler: (_,_) => { // Make StartAsync's linked token "IsCancellationRequested == true" cts.Cancel(); @@ -519,12 +523,12 @@ public async Task StartAsync_ShouldNotCallCompleteOrchestratorTask_WhenWorkflowH using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2)); await handler.StartAsync( - workflowHandler: _ => + workflowHandler: (_,_) => { cts.Cancel(); throw new OperationCanceledException(cts.Token); }, - activityHandler: _ => Task.FromResult(new ActivityResponse()), + activityHandler: (_,_) => Task.FromResult(new ActivityResponse()), cancellationToken: cts.Token); grpcClientMock.Verify( @@ -564,8 +568,8 @@ public async Task StartAsync_ShouldCleanupCompletedActiveWorkItems_WhenActiveWor await RunHandlerUntilAsync( handler, - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: req => Task.FromResult(new ActivityResponse + workflowHandler: (_,_) => Task.FromResult(new OrchestratorResponse()), + activityHandler: (req, _) => Task.FromResult(new ActivityResponse { InstanceId = req.OrchestrationInstance.InstanceId, TaskId = req.TaskId, @@ -605,8 +609,8 @@ public async Task StartAsync_ShouldNotThrow_WhenWorkflowHandlerThrows_AndSending await RunHandlerUntilAsync( handler, - workflowHandler: _ => throw new InvalidOperationException("boom"), - activityHandler: _ => Task.FromResult(new ActivityResponse()), + workflowHandler: (_,_) => throw new InvalidOperationException("boom"), + activityHandler: (_,_) => Task.FromResult(new ActivityResponse()), untilCondition: () => completeAttempted, timeout: TimeSpan.FromSeconds(2)); @@ -618,10 +622,10 @@ public async Task StartAsync_ShouldHandleUnknownWorkItemType_AndWaitForActiveTas { var grpcClientMock = CreateGrpcClientMock(); - var workItems = new[] - { - new WorkItem() // RequestCase = None - }; + WorkItem[] workItems = + [ + new() // RequestCase = None + ]; var getWorkItemsCalled = false; @@ -634,8 +638,8 @@ public async Task StartAsync_ShouldHandleUnknownWorkItemType_AndWaitForActiveTas await RunHandlerUntilAsync( handler, - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: _ => Task.FromResult(new ActivityResponse()), + workflowHandler: (_,_) => Task.FromResult(new OrchestratorResponse()), + activityHandler: (_,_) => Task.FromResult(new ActivityResponse()), untilCondition: () => getWorkItemsCalled, timeout: TimeSpan.FromSeconds(2)); @@ -659,8 +663,8 @@ public async Task StartAsync_ShouldRethrow_WhenReceiveLoopThrowsBeforeAnyItemsAr await RunHandlerUntilAsync( handler, - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: _ => Task.FromResult(new ActivityResponse()), + workflowHandler: (_,_) => Task.FromResult(new OrchestratorResponse()), + activityHandler: (_,_) => Task.FromResult(new ActivityResponse()), untilCondition: () => Volatile.Read(ref getWorkItemsCalls) >= 1, timeout: TimeSpan.FromSeconds(2)); @@ -691,8 +695,8 @@ public async Task StartAsync_ShouldRethrow_WhenReceiveLoopThrowsAfterFirstItemIs // and can be canceled cleanly by the test harness. await RunHandlerUntilAsync( handler, - workflowHandler: _ => Task.FromResult(new OrchestratorResponse()), - activityHandler: _ => Task.FromResult(new ActivityResponse()), + workflowHandler: (_,_) => Task.FromResult(new OrchestratorResponse()), + activityHandler: (_,_) => Task.FromResult(new ActivityResponse()), untilCondition: () => Volatile.Read(ref getWorkItemsCalls) >= 1, timeout: TimeSpan.FromSeconds(2)); diff --git a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs index d5bfde61d..5bba48d50 100644 --- a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs +++ b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs @@ -285,6 +285,42 @@ public async Task CallActivityAsync_ShouldReturnCompletedResult_FromHistoryTaskC Assert.Empty(context.PendingActions); } + [Fact] + public async Task CallActivityAsync_ShouldMatchCompletion_WhenTaskScheduledIdDoesNotMatch() + { + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + + var context = new WorkflowOrchestrationContext( + name: "wf", + instanceId: "i", + currentUtcDateTime: new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), + workflowSerializer: serializer, + loggerFactory: NullLoggerFactory.Instance); + + var task = context.CallActivityAsync("Any"); + var action = Assert.Single(context.PendingActions); + Assert.NotNull(action.ScheduleTask); + Assert.False(string.IsNullOrWhiteSpace(action.ScheduleTask.TaskExecutionId)); + + var history = new[] + { + new HistoryEvent + { + TaskCompleted = new TaskCompletedEvent + { + TaskScheduledId = 123, + TaskExecutionId = action.ScheduleTask.TaskExecutionId, + Result = "\"ok\"" + } + } + }; + + context.ProcessEvents(history, true); + + var result = await task; + Assert.Equal("ok", result); + } + [Fact] public async Task CallActivityAsync_ShouldReturnCompletedResult_FromCallFooTaskCompletedFirst() { diff --git a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs index e509abcbf..266477809 100644 --- a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs +++ b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs @@ -1186,6 +1186,8 @@ public async Task HandleActivityResponseAsync_ShouldUseEmptyInstanceId_WhenOrche Assert.Equal(string.Empty, response.Result); } + private const string CompletionTokenValue = "abc123"; + private static async Task InvokeExecuteAsync(WorkflowWorker worker, CancellationToken token) { var method = typeof(WorkflowWorker).GetMethod("ExecuteAsync", BindingFlags.Instance | BindingFlags.NonPublic); @@ -1200,7 +1202,7 @@ private static async Task InvokeHandleOrchestratorResponse var method = typeof(WorkflowWorker).GetMethod("HandleOrchestratorResponseAsync", BindingFlags.Instance | BindingFlags.NonPublic); Assert.NotNull(method); - var task = (Task)method!.Invoke(worker, [request])!; + var task = (Task)method!.Invoke(worker, [request, CompletionTokenValue])!; return await task; } @@ -1209,7 +1211,7 @@ private static async Task InvokeHandleActivityResponseAsync(Wo var method = typeof(WorkflowWorker).GetMethod("HandleActivityResponseAsync", BindingFlags.Instance | BindingFlags.NonPublic); Assert.NotNull(method); - var task = (Task)method!.Invoke(worker, [request])!; + var task = (Task)method!.Invoke(worker, [request, CompletionTokenValue])!; return await task; }