diff --git a/src/Dapr.Workflow/Client/WorkflowGrpcClient.cs b/src/Dapr.Workflow/Client/WorkflowGrpcClient.cs index 64e0b713d..810813b22 100644 --- a/src/Dapr.Workflow/Client/WorkflowGrpcClient.cs +++ b/src/Dapr.Workflow/Client/WorkflowGrpcClient.cs @@ -245,14 +245,22 @@ public override async Task ListInstanceIdsAsync( request.PageSize = (uint)pageSize.Value; } - var grpcCallOptions = CreateCallOptions(cancellationToken); - var response = await grpcClient.ListInstanceIDsAsync(request, grpcCallOptions); + try + { + var grpcCallOptions = CreateCallOptions(cancellationToken); + var response = await grpcClient.ListInstanceIDsAsync(request, grpcCallOptions); - logger.LogListInstanceIds(response.InstanceIds.Count); + logger.LogListInstanceIds(response.InstanceIds.Count); - return new WorkflowInstancePage( - response.InstanceIds.ToList().AsReadOnly(), - response.HasContinuationToken ? response.ContinuationToken : null); + return new WorkflowInstancePage( + response.InstanceIds.ToList().AsReadOnly(), + response.HasContinuationToken ? response.ContinuationToken : null); + } + catch (RpcException ex) when (IsRpcMethodNotSupportedByRuntime(ex)) + { + throw new NotSupportedException( + "ListInstanceIDs is not supported by the current Dapr runtime version. Please upgrade to a newer Dapr release.", ex); + } } /// @@ -267,17 +275,25 @@ public override async Task> GetInstanceHisto InstanceId = instanceId }; - var grpcCallOptions = CreateCallOptions(cancellationToken); - var response = await grpcClient.GetInstanceHistoryAsync(request, grpcCallOptions); + try + { + var grpcCallOptions = CreateCallOptions(cancellationToken); + var response = await grpcClient.GetInstanceHistoryAsync(request, grpcCallOptions); - var events = response.Events - .Select(ProtoConverters.ToWorkflowHistoryEvent) - .ToList() - .AsReadOnly(); + var events = response.Events + .Select(ProtoConverters.ToWorkflowHistoryEvent) + .ToList() + .AsReadOnly(); - logger.LogGetInstanceHistory(instanceId, events.Count); + logger.LogGetInstanceHistory(instanceId, events.Count); - return events; + return events; + } + catch (RpcException ex) when (IsRpcMethodNotSupportedByRuntime(ex)) + { + throw new NotSupportedException( + "GetInstanceHistory is not supported by the current Dapr runtime version. Please upgrade to a newer Dapr release.", ex); + } } /// @@ -313,12 +329,20 @@ public override async Task RerunWorkflowFromEventAsync( request.Input = SerializeToJson(options.Input); } - var grpcCallOptions = CreateCallOptions(cancellationToken); - var response = await grpcClient.RerunWorkflowFromEventAsync(request, grpcCallOptions); + try + { + var grpcCallOptions = CreateCallOptions(cancellationToken); + var response = await grpcClient.RerunWorkflowFromEventAsync(request, grpcCallOptions); - logger.LogRerunWorkflowFromEvent(sourceInstanceId, eventId, response.NewInstanceID); + logger.LogRerunWorkflowFromEvent(sourceInstanceId, eventId, response.NewInstanceID); - return response.NewInstanceID; + return response.NewInstanceID; + } + catch (RpcException ex) when (IsRpcMethodNotSupportedByRuntime(ex)) + { + throw new NotSupportedException( + "RerunWorkflowFromEvent is not supported by the current Dapr runtime version. Please upgrade to a newer Dapr release.", ex); + } } /// @@ -337,4 +361,18 @@ private static bool IsTerminalStatus(WorkflowRuntimeStatus status) => status is WorkflowRuntimeStatus.Completed or WorkflowRuntimeStatus.Failed or WorkflowRuntimeStatus.Terminated; + + /// + /// Returns true when the indicates that the Dapr sidecar does not + /// implement the requested gRPC method and fell back to its service-invocation proxy, which then + /// failed because the workflow client never sends a dapr-app-id header. This pattern + /// occurs on older Dapr runtime versions that pre-date the GetInstanceHistory, + /// ListInstanceIDs, and RerunWorkflowFromEvent RPCs. + /// The sidecar emits "failed to proxy request: required metadata dapr-callee-app-id or dapr-app-id not found" + /// in this case. + /// + private static bool IsRpcMethodNotSupportedByRuntime(RpcException ex) => + ex.StatusCode == StatusCode.Unknown && + ex.Status.Detail.Contains("required metadata", StringComparison.OrdinalIgnoreCase) && + ex.Status.Detail.Contains("dapr-app-id", StringComparison.OrdinalIgnoreCase); } diff --git a/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs b/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs index 94dc0d7d8..57c84f5e6 100644 --- a/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs +++ b/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs @@ -74,7 +74,6 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext private bool _isReplaying; private bool _turnInitialized; private bool _preserveUnprocessedEvents; - public WorkflowOrchestrationContext(string name, string instanceId, DateTime currentUtcDateTime, IWorkflowSerializer workflowSerializer, ILoggerFactory loggerFactory, WorkflowVersionTracker versionTracker, string? appId = null, string? executionId = null) @@ -360,20 +359,25 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce } }; - // Do NOT snapshot _externalEventBuffer here. ContinueAsNew is called from within - // workflow execution, which happens during ProcessEvents. Events arriving later in - // the same NewEvents batch will be buffered AFTER this point and would be missed. - // FinalizeCarryoverEvents() is called after all ProcessEvents calls are complete. + // The Dapr sidecar's CarryoverEvents field strips the Input when re-delivering events, + // causing all carryover payloads to deserialize as default(T). Instead, we re-queue + // unprocessed events as SendEvent actions to self in FinalizeCarryoverEvents(), which + // preserves the original Input values and avoids double-delivery. _preserveUnprocessedEvents = preserveUnprocessedEvents; _pendingActions.Add(action.Id, action); } /// - /// Populates CarryoverEvents on any pending ContinuedAsNew action using the - /// final state of _externalEventBuffer. Must be called after all ProcessEvents - /// calls for the current turn are complete, so that events arriving later in the same - /// NewEvents batch are included. + /// Re-queues unprocessed external events as SendEvent actions to this instance + /// so that the new execution (after ContinuedAsNew) receives them with their + /// original input values intact. /// + /// + /// Using the gRPC CarryoverEvents field causes the Dapr sidecar to strip the + /// Input of each re-delivered event, so all payloads deserialize as + /// default(T). Emitting SendEvent actions to self avoids that stripping + /// because the event data travels through the sidecar's normal event queue path. + /// internal void FinalizeCarryoverEvents() { if (!_preserveUnprocessedEvents || _externalEventBuffer.Count == 0) @@ -383,7 +387,20 @@ internal void FinalizeCarryoverEvents() { if (action.CompleteOrchestration?.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew) { - action.CompleteOrchestration.CarryoverEvents.AddRange(_externalEventBuffer); + foreach (var bufferedEvent in _externalEventBuffer) + { + var sendId = _sequenceNumber++; + _pendingActions.Add(sendId, new OrchestratorAction + { + Id = sendId, + SendEvent = new SendEventAction + { + Instance = new OrchestrationInstance { InstanceId = InstanceId }, + Name = bufferedEvent.EventRaised.Name, + Data = bufferedEvent.EventRaised.Input + } + }); + } return; } } diff --git a/test/Dapr.IntegrationTest.Workflow.Versioning/CombinedVersioningIntegrationTests.cs b/test/Dapr.IntegrationTest.Workflow.Versioning/CombinedVersioningIntegrationTests.cs index fcdae72a3..c97c3a6b8 100644 --- a/test/Dapr.IntegrationTest.Workflow.Versioning/CombinedVersioningIntegrationTests.cs +++ b/test/Dapr.IntegrationTest.Workflow.Versioning/CombinedVersioningIntegrationTests.cs @@ -16,6 +16,7 @@ using Dapr.Testcontainers.Common.Options; using Dapr.Testcontainers.Common.Testing; using Dapr.Testcontainers.Harnesses; +using Dapr.Testcontainers.Xunit.Attributes; using Dapr.Workflow; using Dapr.Workflow.Versioning; using Grpc.Core; @@ -29,7 +30,7 @@ public sealed class CombinedVersioningIntegrationTests private const string CanonicalWorkflowName = "CombinedVersionedWorkflow"; private const string ResumeEventName = "resume"; - [Fact] + [MinimumDaprRuntimeFact("1.17")] public async Task ShouldCombinePatchAndNameBasedVersioning() { var instanceIdV1 = Guid.NewGuid().ToString("N"); diff --git a/test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs b/test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs index 041f6ca0d..6a2fb6767 100644 --- a/test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs @@ -34,11 +34,17 @@ public sealed class ContinueAsNewCarryoverEventsTests /// into a single NewEvents delivery, the pre-fix code lost every signal after the first. /// After the fix the full buffer is captured once all events are processed, so every /// signal survives as a carryover event and the workflow counts down to zero. + /// + /// The signal count is intentionally small (15) so that all signals can be fired + /// simultaneously — maximising the chance the sidecar batches several of them into + /// a single NewEvents delivery — while keeping the total wall-clock time well under + /// 30 seconds. Each ContinueAsNew iteration requires one sidecar round-trip; a larger + /// count (e.g. 250) makes the test take 2+ minutes and risks CI timeouts. /// [Fact] public async Task ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether() { - const int signalCount = 250; + const int signalCount = 15; var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); var workflowInstanceId = Guid.NewGuid().ToString(); @@ -86,16 +92,19 @@ await Task.WhenAll( // All signals must be consumed via carryover before the workflow completes. using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource( TestContext.Current.CancellationToken); - timeoutCts.CancelAfter(TimeSpan.FromMinutes(2)); + timeoutCts.CancelAfter(TimeSpan.FromSeconds(60)); var result = await client.WaitForWorkflowCompletionAsync( workflowInstanceId, cancellation: timeoutCts.Token); Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); - // Every index in [0, signalCount) must appear exactly once in the output — no drops, no duplicates. + // Every index in [0, signalCount) must appear exactly once — no drops, no duplicates. + // Order() sorts the received values; comparing against Range ensures complete coverage + // with the correct payload for each signal (not default(int)=0 from Input stripping). var receivedIndexes = result.ReadOutputAs>(); Assert.NotNull(receivedIndexes); + Assert.Equal(signalCount, receivedIndexes.Count); Assert.Equal(Enumerable.Range(0, signalCount), receivedIndexes.Order()); } diff --git a/test/Dapr.IntegrationTest.Workflow/ExternalEventCancellationTests.cs b/test/Dapr.IntegrationTest.Workflow/ExternalEventCancellationTests.cs index 8d3672de2..359ecbbeb 100644 --- a/test/Dapr.IntegrationTest.Workflow/ExternalEventCancellationTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/ExternalEventCancellationTests.cs @@ -25,7 +25,7 @@ public sealed class ExternalEventCancellationSequentialTests public async Task ExternalEvents_ShouldComplete_WhenRaisedSequentially_WithDelay() { await ExternalEventCancellationTestHarness.RunAsync( - workflowCount: 1000, + workflowCount: 50, raiseEventsInParallel: false, perEventDelay: TimeSpan.FromMilliseconds(75), initialWaitTimeout: TimeSpan.FromMilliseconds(200)); @@ -38,7 +38,7 @@ public sealed class ExternalEventCancellationParallelTests public async Task ExternalEvents_ShouldComplete_WhenRaisedInParallel_MinimalDelay() { await ExternalEventCancellationTestHarness.RunAsync( - workflowCount: 1000, + workflowCount: 50, raiseEventsInParallel: true, perEventDelay: TimeSpan.Zero, initialWaitTimeout: TimeSpan.FromMilliseconds(200)); @@ -93,7 +93,7 @@ public static async Task RunAsync( await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(CanceledWaitWorkflow), workflowId, initialWaitTimeout); } - using var waitCts = new CancellationTokenSource(TimeSpan.FromMinutes(3)); + using var waitCts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); await Task.WhenAll(workflowIds.Select(id => WaitForCustomStatusAsync(daprWorkflowClient, id, WaitingAfterTimeoutStatus, waitCts.Token))); @@ -113,7 +113,7 @@ await Task.WhenAll(workflowIds.Select(id => } } - using var completionCts = new CancellationTokenSource(TimeSpan.FromMinutes(3)); + using var completionCts = new CancellationTokenSource(TimeSpan.FromSeconds(60)); var results = await Task.WhenAll(workflowIds.Select(id => daprWorkflowClient.WaitForWorkflowCompletionAsync(id, cancellation: completionCts.Token))); diff --git a/test/Dapr.IntegrationTest.Workflow/WorkflowRpcTests.cs b/test/Dapr.IntegrationTest.Workflow/WorkflowRpcTests.cs index 2eab307b9..643113409 100644 --- a/test/Dapr.IntegrationTest.Workflow/WorkflowRpcTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/WorkflowRpcTests.cs @@ -13,6 +13,7 @@ using Dapr.Testcontainers.Common; using Dapr.Testcontainers.Harnesses; +using Dapr.Testcontainers.Xunit.Attributes; using Dapr.Workflow; using Dapr.Workflow.Client; using Microsoft.Extensions.Configuration; @@ -22,7 +23,7 @@ namespace Dapr.IntegrationTest.Workflow; public sealed class WorkflowRpcTests { - [Fact] + [MinimumDaprRuntimeFact("1.17.0")] public async Task ListInstanceIds_ShouldReturnScheduledWorkflowInstances() { var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); @@ -63,7 +64,7 @@ public async Task ListInstanceIds_ShouldReturnScheduledWorkflowInstances() Assert.Contains(instanceId, page.InstanceIds); } - [Fact] + [MinimumDaprRuntimeFact("1.17.0")] public async Task GetInstanceHistory_ShouldReturnHistoryForCompletedWorkflow() { var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); diff --git a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs index 43c0a61d3..d0d0cd0f0 100644 --- a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs +++ b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs @@ -1057,8 +1057,16 @@ public void CreateReplaySafeLogger_GenericOverload_ShouldReturnReplaySafeLogger( Assert.Equal(typeof(MyExampleType), innerLoggerType.GetGenericArguments()[0]); } + /// + /// When preserveUnprocessedEvents: true, FinalizeCarryoverEvents should emit + /// SendEvent actions to self for each buffered event so that the new execution + /// receives them via the sidecar's normal event queue path (with Input intact). + /// It must NOT populate CarryoverEvents on the ContinuedAsNew action because + /// the Dapr sidecar strips the Input field when re-delivering carryover events, + /// causing all payloads to deserialize as default(T). + /// [Fact] - public void ContinueAsNew_ShouldAddCompleteOrchestrationAction_WithCarryoverEvents_WhenPreserveUnprocessedEventsIsTrue() + public void ContinueAsNew_ShouldEmitSendEventActionsForBufferedEvents_WhenPreserveUnprocessedEventsIsTrue() { var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); var tracker = new WorkflowVersionTracker([]); @@ -1071,30 +1079,119 @@ public void ContinueAsNew_ShouldAddCompleteOrchestrationAction_WithCarryoverEven var context = new WorkflowOrchestrationContext( name: "wf", - instanceId: "i", + instanceId: "my-instance", currentUtcDateTime: new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), workflowSerializer: serializer, loggerFactory: NullLoggerFactory.Instance, tracker); - context.ProcessEvents(history, true); + context.ProcessEvents(history, isReplaying: true); context.ContinueAsNew(newInput: new { V = 9 }, preserveUnprocessedEvents: true); - // FinalizeCarryoverEvents must be called after all ProcessEvents calls are done; - // CarryoverEvents is populated here rather than inside ContinueAsNew so that events - // arriving later in the same NewEvents batch are not missed. context.FinalizeCarryoverEvents(); - Assert.Single(context.PendingActions); - var action = context.PendingActions.First(); + // Expect: 1 ContinuedAsNew + 2 SendEvent actions (one per buffered event). + Assert.Equal(3, context.PendingActions.Count); - Assert.NotNull(action.CompleteOrchestration); - Assert.Equal(OrchestrationStatus.ContinuedAsNew, action.CompleteOrchestration.OrchestrationStatus); - Assert.Contains("\"v\":9", action.CompleteOrchestration.Result); - Assert.Equal(2, action.CompleteOrchestration.CarryoverEvents.Count); + var continueAction = context.PendingActions.First(); + Assert.NotNull(continueAction.CompleteOrchestration); + Assert.Equal(OrchestrationStatus.ContinuedAsNew, continueAction.CompleteOrchestration.OrchestrationStatus); + Assert.Contains("\"v\":9", continueAction.CompleteOrchestration.Result); + // CarryoverEvents must be empty to avoid the sidecar's Input-stripping behaviour. + Assert.Empty(continueAction.CompleteOrchestration.CarryoverEvents); + + var sendActions = context.PendingActions.Skip(1).ToList(); + Assert.Equal(2, sendActions.Count); + Assert.All(sendActions, a => + { + Assert.NotNull(a.SendEvent); + Assert.Equal("my-instance", a.SendEvent.Instance.InstanceId); + }); + Assert.Contains(sendActions, a => a.SendEvent!.Name == "e1" && a.SendEvent.Data == "\"x\""); + Assert.Contains(sendActions, a => a.SendEvent!.Name == "e2" && a.SendEvent.Data == "\"y\""); } + /// + /// Validates the full carryover correctness at a unit-test level. + /// + /// When many signals arrive together in one NewEvents batch and the workflow processes + /// one per ContinueAsNew cycle, FinalizeCarryoverEvents must re-queue the remaining + /// signals via SendEvent so that the new execution receives them with their original + /// Input values preserved (not stripped to default(T) as CarryoverEvents would). + /// [Fact] - public void ContinueAsNew_ShouldNotCarryOverEvents_WhenPreserveUnprocessedEventsIsFalse() + public async Task ContinueAsNew_WithPreserveUnprocessedEvents_ShouldRequeueBufferedEventsViaSendEventWithCorrectValues() + { + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var tracker = new WorkflowVersionTracker([]); + + var allEvents = new[] + { + new HistoryEvent { EventRaised = new EventRaisedEvent { Name = "signal", Input = "1" } }, + new HistoryEvent { EventRaised = new EventRaisedEvent { Name = "signal", Input = "2" } }, + new HistoryEvent { EventRaised = new EventRaisedEvent { Name = "signal", Input = "3" } } + }; + + // ── First execution ────────────────────────────────────────────────────────── + var context1 = new WorkflowOrchestrationContext( + name: "wf", + instanceId: "my-instance", + currentUtcDateTime: new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), + workflowSerializer: serializer, + loggerFactory: NullLoggerFactory.Instance, + tracker); + + // Register a waiter before events arrive (workflow reaches its first await). + var firstEventTask = context1.WaitForExternalEventAsync("signal", TestContext.Current.CancellationToken); + + // All 3 events arrive in the same NewEvents batch; only the first is consumed. + context1.ProcessEvents(allEvents, isReplaying: false); + + Assert.True(firstEventTask.IsCompleted, "First WaitForExternalEventAsync should complete synchronously."); + Assert.Equal(1, await firstEventTask); + + // Workflow calls ContinueAsNew; FinalizeCarryoverEvents re-queues the 2 unconsumed events. + context1.ContinueAsNew(newInput: 1, preserveUnprocessedEvents: true); + context1.FinalizeCarryoverEvents(); + + // 1 ContinuedAsNew + 2 SendEvent (one per unconsumed event). + Assert.Equal(3, context1.PendingActions.Count); + + var continueAction = context1.PendingActions.First(); + Assert.NotNull(continueAction.CompleteOrchestration); + Assert.Equal(OrchestrationStatus.ContinuedAsNew, continueAction.CompleteOrchestration.OrchestrationStatus); + Assert.Empty(continueAction.CompleteOrchestration.CarryoverEvents); + + // SendEvent actions must carry the original serialized values, not null / default. + var sendActions = context1.PendingActions.Skip(1).ToList(); + Assert.Equal(2, sendActions.Count); + Assert.Contains(sendActions, a => a.SendEvent!.Name == "signal" && a.SendEvent.Data == "2"); + Assert.Contains(sendActions, a => a.SendEvent!.Name == "signal" && a.SendEvent.Data == "3"); + + // ── Second execution (simulates sidecar delivering the re-queued events) ───── + var tracker2 = new WorkflowVersionTracker([]); + var context2 = new WorkflowOrchestrationContext( + name: "wf", + instanceId: "my-instance", + currentUtcDateTime: new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), + workflowSerializer: serializer, + loggerFactory: NullLoggerFactory.Instance, + tracker2); + + var secondEventTask = context2.WaitForExternalEventAsync("signal", TestContext.Current.CancellationToken); + + // Sidecar delivers events 2 and 3 (re-queued via SendEvent); event 1 is not re-sent. + context2.ProcessEvents(new[] { allEvents[1], allEvents[2] }, isReplaying: false); + + Assert.True(secondEventTask.IsCompleted, "Second WaitForExternalEventAsync should complete synchronously."); + + // The critical assertion: the value must be 2, not 0/default. + // The original bug populated CarryoverEvents but the sidecar stripped Input, + // causing payloads to deserialize as 0. With the SendEvent approach the value is preserved. + Assert.Equal(2, await secondEventTask); + } + + [Fact] + public void ContinueAsNew_ShouldNotEmitSendEventActions_WhenPreserveUnprocessedEventsIsFalse() { var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); var tracker = new WorkflowVersionTracker([]); @@ -1113,9 +1210,11 @@ public void ContinueAsNew_ShouldNotCarryOverEvents_WhenPreserveUnprocessedEvents loggerFactory: NullLoggerFactory.Instance, tracker); - context.ProcessEvents(history, true); + context.ProcessEvents(history, isReplaying: true); context.ContinueAsNew(newInput: null, preserveUnprocessedEvents: false); + context.FinalizeCarryoverEvents(); + // Only the ContinuedAsNew action; no SendEvent actions because preserve is false. Assert.Single(context.PendingActions); var action = context.PendingActions.First();