Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions src/Dapr.Workflow/Worker/Internal/WorkflowOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext
private DateTime _currentUtcDateTime;
private bool _isReplaying;
private bool _turnInitialized;
private bool _preserveUnprocessedEvents;

public WorkflowOrchestrationContext(string name, string instanceId, DateTime currentUtcDateTime,
IWorkflowSerializer workflowSerializer, ILoggerFactory loggerFactory, WorkflowVersionTracker versionTracker,
string? appId = null, string? executionId = null)
Expand Down Expand Up @@ -358,23 +360,33 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce
}
};

// The Dapr sidecar preserves unprocessed events via its own pending-event queue and
// re-delivers them to the new execution automatically. Using the gRPC CarryoverEvents
// field causes double-delivery (once from CarryoverEvents, once from the pending queue)
// and the sidecar strips the Input field from CarryoverEvents (causing default-value
// payloads). We therefore rely on Dapr's natural re-delivery and ignore preserveUnprocessedEvents.
// Do NOT snapshot _externalEventBuffer here. ContinueAsNew is called from within
// workflow execution, which happens during ProcessEvents. Events arriving later in
// the same NewEvents batch will be buffered AFTER this point and would be missed.
// FinalizeCarryoverEvents() is called after all ProcessEvents calls are complete.
_preserveUnprocessedEvents = preserveUnprocessedEvents;
_pendingActions.Add(action.Id, action);
}

/// <summary>
/// No-op: the Dapr sidecar automatically re-delivers unprocessed events to the new
/// execution after ContinuedAsNew via its own persistent event queue, so we do not
/// need to populate <c>CarryoverEvents</c> on the gRPC action. Doing so causes
/// double-delivery with a stripped <c>Input</c> field.
/// Populates <c>CarryoverEvents</c> on any pending <c>ContinuedAsNew</c> action using the
/// final state of <c>_externalEventBuffer</c>. Must be called after all <c>ProcessEvents</c>
/// calls for the current turn are complete, so that events arriving later in the same
/// <c>NewEvents</c> batch are included.
/// </summary>
internal void FinalizeCarryoverEvents()
{
// Intentionally empty – see comment above.
if (!_preserveUnprocessedEvents || _externalEventBuffer.Count == 0)
return;

foreach (var action in _pendingActions.Values)
{
if (action.CompleteOrchestration?.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
{
action.CompleteOrchestration.CarryoverEvents.AddRange(_externalEventBuffer);
return;
}
}
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,11 @@ public sealed class ContinueAsNewCarryoverEventsTests
/// into a single NewEvents delivery, the pre-fix code lost every signal after the first.
/// After the fix the full buffer is captured once all events are processed, so every
/// signal survives as a carryover event and the workflow counts down to zero.
///
/// The signal count is intentionally small (15) so that all signals can be fired
/// simultaneously — maximising the chance the sidecar batches several of them into
/// a single NewEvents delivery — while keeping the total wall-clock time well under
/// 30 seconds. Each ContinueAsNew iteration requires one sidecar round-trip; a larger
/// count (e.g. 250) makes the test take 2+ minutes and risks CI timeouts.
/// </summary>
[Fact]
public async Task ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether()
{
const int signalCount = 15;
const int signalCount = 250;
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

Expand Down Expand Up @@ -92,19 +86,16 @@ await Task.WhenAll(
// All signals must be consumed via carryover before the workflow completes.
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(
TestContext.Current.CancellationToken);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(60));
timeoutCts.CancelAfter(TimeSpan.FromMinutes(2));

var result = await client.WaitForWorkflowCompletionAsync(
workflowInstanceId, cancellation: timeoutCts.Token);

Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);

// Every index in [0, signalCount) must appear exactly once — no drops, no duplicates.
// Order() sorts the received values; comparing against Range ensures complete coverage
// with the correct payload for each signal (not default(int)=0 from Input stripping).
// Every index in [0, signalCount) must appear exactly once in the output — no drops, no duplicates.
var receivedIndexes = result.ReadOutputAs<List<int>>();
Assert.NotNull(receivedIndexes);
Assert.Equal(signalCount, receivedIndexes.Count);
Assert.Equal(Enumerable.Range(0, signalCount), receivedIndexes.Order());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ public void CreateReplaySafeLogger_GenericOverload_ShouldReturnReplaySafeLogger(
}

[Fact]
public void ContinueAsNew_ShouldAddCompleteOrchestrationAction_WithNoCarryoverEvents_WhenPreserveUnprocessedEventsIsTrue()
public void ContinueAsNew_ShouldAddCompleteOrchestrationAction_WithCarryoverEvents_WhenPreserveUnprocessedEventsIsTrue()
{
var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web));
var tracker = new WorkflowVersionTracker([]);
Expand All @@ -1079,6 +1079,9 @@ public void ContinueAsNew_ShouldAddCompleteOrchestrationAction_WithNoCarryoverEv

context.ProcessEvents(history, true);
context.ContinueAsNew(newInput: new { V = 9 }, preserveUnprocessedEvents: true);
// FinalizeCarryoverEvents must be called after all ProcessEvents calls are done;
// CarryoverEvents is populated here rather than inside ContinueAsNew so that events
// arriving later in the same NewEvents batch are not missed.
context.FinalizeCarryoverEvents();

Assert.Single(context.PendingActions);
Expand All @@ -1087,82 +1090,7 @@ public void ContinueAsNew_ShouldAddCompleteOrchestrationAction_WithNoCarryoverEv
Assert.NotNull(action.CompleteOrchestration);
Assert.Equal(OrchestrationStatus.ContinuedAsNew, action.CompleteOrchestration.OrchestrationStatus);
Assert.Contains("\"v\":9", action.CompleteOrchestration.Result);
// CarryoverEvents is intentionally empty: Dapr's pending-event queue re-delivers
// unprocessed events to the new execution automatically, without needing the SDK
// to populate this field (which would cause double-delivery with a stripped Input).
Assert.Empty(action.CompleteOrchestration.CarryoverEvents);
}

/// <summary>
/// Validates the full carryover correctness at a unit-test level.
///
/// The Dapr sidecar re-delivers unconsumed events to new executions via its own
/// persistent event queue. This test simulates that re-delivery: the SDK correctly
/// delivers buffered events with their original, non-null input values in the next
/// execution. This is the unit-test equivalent of the end-to-end validation done
/// by <c>ContinueAsNewCarryoverEventsTests</c>.
/// </summary>
[Fact]
public async Task ContinueAsNew_WithPreserveUnprocessedEvents_ShouldDeliverBufferedEventsWithCorrectValuesInNextExecution()
{
var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web));
var tracker = new WorkflowVersionTracker([]);

var allEvents = new[]
{
new HistoryEvent { EventRaised = new EventRaisedEvent { Name = "signal", Input = "1" } },
new HistoryEvent { EventRaised = new EventRaisedEvent { Name = "signal", Input = "2" } },
new HistoryEvent { EventRaised = new EventRaisedEvent { Name = "signal", Input = "3" } }
};

// ── First execution ──────────────────────────────────────────────────────────
var context1 = new WorkflowOrchestrationContext(
name: "wf",
instanceId: "i",
currentUtcDateTime: new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc),
workflowSerializer: serializer,
loggerFactory: NullLoggerFactory.Instance,
tracker);

// Register a waiter before events arrive (workflow reaches its first await).
var firstEventTask = context1.WaitForExternalEventAsync<int>("signal", TestContext.Current.CancellationToken);

// All 3 events arrive in the same NewEvents batch; only the first is consumed.
context1.ProcessEvents(allEvents, isReplaying: false);

Assert.True(firstEventTask.IsCompleted, "First WaitForExternalEventAsync should complete synchronously.");
Assert.Equal(1, await firstEventTask);

// Workflow calls ContinueAsNew; CarryoverEvents must remain empty.
context1.ContinueAsNew(newInput: 1, preserveUnprocessedEvents: true);
context1.FinalizeCarryoverEvents();

var continueAction = context1.PendingActions.First();
Assert.NotNull(continueAction.CompleteOrchestration);
Assert.Equal(OrchestrationStatus.ContinuedAsNew, continueAction.CompleteOrchestration.OrchestrationStatus);
Assert.Empty(continueAction.CompleteOrchestration.CarryoverEvents);

// ── Second execution (simulates sidecar re-delivering events 2 and 3) ────────
var tracker2 = new WorkflowVersionTracker([]);
var context2 = new WorkflowOrchestrationContext(
name: "wf",
instanceId: "i",
currentUtcDateTime: new DateTime(2025, 01, 01, 0, 0, 0, DateTimeKind.Utc),
workflowSerializer: serializer,
loggerFactory: NullLoggerFactory.Instance,
tracker2);

var secondEventTask = context2.WaitForExternalEventAsync<int>("signal", TestContext.Current.CancellationToken);

// Sidecar re-delivers only the unconsumed events (2 and 3); event 1 is not re-sent.
context2.ProcessEvents(new[] { allEvents[1], allEvents[2] }, isReplaying: false);

Assert.True(secondEventTask.IsCompleted, "Second WaitForExternalEventAsync should complete synchronously.");

// The critical assertion: the value must be 2, not 0/default.
// The original bug populated CarryoverEvents but the sidecar stripped Input,
// causing double-delivery with null inputs that deserialized as 0.
Assert.Equal(2, await secondEventTask);
Assert.Equal(2, action.CompleteOrchestration.CarryoverEvents.Count);
}

[Fact]
Expand Down
Loading