diff --git a/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs b/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs index aa67fc853..94dc0d7d8 100644 --- a/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs +++ b/src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs @@ -73,6 +73,8 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext private DateTime _currentUtcDateTime; private bool _isReplaying; private bool _turnInitialized; + private bool _preserveUnprocessedEvents; + public WorkflowOrchestrationContext(string name, string instanceId, DateTime currentUtcDateTime, IWorkflowSerializer workflowSerializer, ILoggerFactory loggerFactory, WorkflowVersionTracker versionTracker, string? appId = null, string? executionId = null) @@ -358,23 +360,33 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce } }; - // The Dapr sidecar preserves unprocessed events via its own pending-event queue and - // re-delivers them to the new execution automatically. Using the gRPC CarryoverEvents - // field causes double-delivery (once from CarryoverEvents, once from the pending queue) - // and the sidecar strips the Input field from CarryoverEvents (causing default-value - // payloads). We therefore rely on Dapr's natural re-delivery and ignore preserveUnprocessedEvents. + // Do NOT snapshot _externalEventBuffer here. ContinueAsNew is called from within + // workflow execution, which happens during ProcessEvents. Events arriving later in + // the same NewEvents batch will be buffered AFTER this point and would be missed. + // FinalizeCarryoverEvents() is called after all ProcessEvents calls are complete. + _preserveUnprocessedEvents = preserveUnprocessedEvents; _pendingActions.Add(action.Id, action); } /// - /// No-op: the Dapr sidecar automatically re-delivers unprocessed events to the new - /// execution after ContinuedAsNew via its own persistent event queue, so we do not - /// need to populate CarryoverEvents on the gRPC action. Doing so causes - /// double-delivery with a stripped Input field. + /// Populates CarryoverEvents on any pending ContinuedAsNew action using the + /// final state of _externalEventBuffer. Must be called after all ProcessEvents + /// calls for the current turn are complete, so that events arriving later in the same + /// NewEvents batch are included. /// internal void FinalizeCarryoverEvents() { - // Intentionally empty – see comment above. + if (!_preserveUnprocessedEvents || _externalEventBuffer.Count == 0) + return; + + foreach (var action in _pendingActions.Values) + { + if (action.CompleteOrchestration?.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew) + { + action.CompleteOrchestration.CarryoverEvents.AddRange(_externalEventBuffer); + return; + } + } } /// diff --git a/test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs b/test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs index 6a2fb6767..041f6ca0d 100644 --- a/test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs +++ b/test/Dapr.IntegrationTest.Workflow/ContinueAsNewCarryoverEventsTests.cs @@ -34,17 +34,11 @@ public sealed class ContinueAsNewCarryoverEventsTests /// into a single NewEvents delivery, the pre-fix code lost every signal after the first. /// After the fix the full buffer is captured once all events are processed, so every /// signal survives as a carryover event and the workflow counts down to zero. - /// - /// The signal count is intentionally small (15) so that all signals can be fired - /// simultaneously — maximising the chance the sidecar batches several of them into - /// a single NewEvents delivery — while keeping the total wall-clock time well under - /// 30 seconds. Each ContinueAsNew iteration requires one sidecar round-trip; a larger - /// count (e.g. 250) makes the test take 2+ minutes and risks CI timeouts. /// [Fact] public async Task ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether() { - const int signalCount = 15; + const int signalCount = 250; var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); var workflowInstanceId = Guid.NewGuid().ToString(); @@ -92,19 +86,16 @@ await Task.WhenAll( // All signals must be consumed via carryover before the workflow completes. using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource( TestContext.Current.CancellationToken); - timeoutCts.CancelAfter(TimeSpan.FromSeconds(60)); + timeoutCts.CancelAfter(TimeSpan.FromMinutes(2)); var result = await client.WaitForWorkflowCompletionAsync( workflowInstanceId, cancellation: timeoutCts.Token); Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); - // Every index in [0, signalCount) must appear exactly once — no drops, no duplicates. - // Order() sorts the received values; comparing against Range ensures complete coverage - // with the correct payload for each signal (not default(int)=0 from Input stripping). + // Every index in [0, signalCount) must appear exactly once in the output — no drops, no duplicates. var receivedIndexes = result.ReadOutputAs>(); Assert.NotNull(receivedIndexes); - Assert.Equal(signalCount, receivedIndexes.Count); Assert.Equal(Enumerable.Range(0, signalCount), receivedIndexes.Order()); } diff --git a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs index 48f5ab365..43c0a61d3 100644 --- a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs +++ b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs @@ -1058,7 +1058,7 @@ public void CreateReplaySafeLogger_GenericOverload_ShouldReturnReplaySafeLogger( } [Fact] - public void ContinueAsNew_ShouldAddCompleteOrchestrationAction_WithNoCarryoverEvents_WhenPreserveUnprocessedEventsIsTrue() + public void ContinueAsNew_ShouldAddCompleteOrchestrationAction_WithCarryoverEvents_WhenPreserveUnprocessedEventsIsTrue() { var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); var tracker = new WorkflowVersionTracker([]); @@ -1079,6 +1079,9 @@ public void ContinueAsNew_ShouldAddCompleteOrchestrationAction_WithNoCarryoverEv context.ProcessEvents(history, true); context.ContinueAsNew(newInput: new { V = 9 }, preserveUnprocessedEvents: true); + // FinalizeCarryoverEvents must be called after all ProcessEvents calls are done; + // CarryoverEvents is populated here rather than inside ContinueAsNew so that events + // arriving later in the same NewEvents batch are not missed. context.FinalizeCarryoverEvents(); Assert.Single(context.PendingActions); @@ -1087,82 +1090,7 @@ public void ContinueAsNew_ShouldAddCompleteOrchestrationAction_WithNoCarryoverEv Assert.NotNull(action.CompleteOrchestration); Assert.Equal(OrchestrationStatus.ContinuedAsNew, action.CompleteOrchestration.OrchestrationStatus); Assert.Contains("\"v\":9", action.CompleteOrchestration.Result); - // CarryoverEvents is intentionally empty: Dapr's pending-event queue re-delivers - // unprocessed events to the new execution automatically, without needing the SDK - // to populate this field (which would cause double-delivery with a stripped Input). - Assert.Empty(action.CompleteOrchestration.CarryoverEvents); - } - - /// - /// Validates the full carryover correctness at a unit-test level. - /// - /// The Dapr sidecar re-delivers unconsumed events to new executions via its own - /// persistent event queue. This test simulates that re-delivery: the SDK correctly - /// delivers buffered events with their original, non-null input values in the next - /// execution. This is the unit-test equivalent of the end-to-end validation done - /// by ContinueAsNewCarryoverEventsTests. - /// - [Fact] - public async Task ContinueAsNew_WithPreserveUnprocessedEvents_ShouldDeliverBufferedEventsWithCorrectValuesInNextExecution() - { - var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); - var tracker = new WorkflowVersionTracker([]); - - var allEvents = new[] - { - new HistoryEvent { EventRaised = new EventRaisedEvent { Name = "signal", Input = "1" } }, - new HistoryEvent { EventRaised = new EventRaisedEvent { Name = "signal", Input = "2" } }, - new HistoryEvent { EventRaised = new EventRaisedEvent { Name = "signal", Input = "3" } } - }; - - // ── First execution ────────────────────────────────────────────────────────── - var context1 = new WorkflowOrchestrationContext( - name: "wf", - instanceId: "i", - currentUtcDateTime: new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), - workflowSerializer: serializer, - loggerFactory: NullLoggerFactory.Instance, - tracker); - - // Register a waiter before events arrive (workflow reaches its first await). - var firstEventTask = context1.WaitForExternalEventAsync("signal", TestContext.Current.CancellationToken); - - // All 3 events arrive in the same NewEvents batch; only the first is consumed. - context1.ProcessEvents(allEvents, isReplaying: false); - - Assert.True(firstEventTask.IsCompleted, "First WaitForExternalEventAsync should complete synchronously."); - Assert.Equal(1, await firstEventTask); - - // Workflow calls ContinueAsNew; CarryoverEvents must remain empty. - context1.ContinueAsNew(newInput: 1, preserveUnprocessedEvents: true); - context1.FinalizeCarryoverEvents(); - - var continueAction = context1.PendingActions.First(); - Assert.NotNull(continueAction.CompleteOrchestration); - Assert.Equal(OrchestrationStatus.ContinuedAsNew, continueAction.CompleteOrchestration.OrchestrationStatus); - Assert.Empty(continueAction.CompleteOrchestration.CarryoverEvents); - - // ── Second execution (simulates sidecar re-delivering events 2 and 3) ──────── - var tracker2 = new WorkflowVersionTracker([]); - var context2 = new WorkflowOrchestrationContext( - name: "wf", - instanceId: "i", - currentUtcDateTime: new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc), - workflowSerializer: serializer, - loggerFactory: NullLoggerFactory.Instance, - tracker2); - - var secondEventTask = context2.WaitForExternalEventAsync("signal", TestContext.Current.CancellationToken); - - // Sidecar re-delivers only the unconsumed events (2 and 3); event 1 is not re-sent. - context2.ProcessEvents(new[] { allEvents[1], allEvents[2] }, isReplaying: false); - - Assert.True(secondEventTask.IsCompleted, "Second WaitForExternalEventAsync should complete synchronously."); - - // The critical assertion: the value must be 2, not 0/default. - // The original bug populated CarryoverEvents but the sidecar stripped Input, - // causing double-delivery with null inputs that deserialized as 0. - Assert.Equal(2, await secondEventTask); + Assert.Equal(2, action.CompleteOrchestration.CarryoverEvents.Count); } [Fact]