Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ internal sealed class WorkflowOrchestrationContext : WorkflowContext
private DateTime _currentUtcDateTime;
private bool _isReplaying;
private bool _turnInitialized;
private bool _preserveUnprocessedEvents;

public WorkflowOrchestrationContext(string name, string instanceId, DateTime currentUtcDateTime,
IWorkflowSerializer workflowSerializer, ILoggerFactory loggerFactory, WorkflowVersionTracker versionTracker,
Expand Down Expand Up @@ -359,13 +360,33 @@ public override void ContinueAsNew(object? newInput = null, bool preserveUnproce
}
};

if (preserveUnprocessedEvents)
// Do NOT snapshot _externalEventBuffer here. ContinueAsNew is called from within
// workflow execution, which happens during ProcessEvents. Events arriving later in
// the same NewEvents batch will be buffered AFTER this point and would be missed.
// FinalizeCarryoverEvents() is called after all ProcessEvents calls are complete.
_preserveUnprocessedEvents = preserveUnprocessedEvents;
_pendingActions.Add(action.Id, action);
}

/// <summary>
/// Populates <c>CarryoverEvents</c> on any pending <c>ContinuedAsNew</c> action using the
/// final state of <c>_externalEventBuffer</c>. Must be called after all <c>ProcessEvents</c>
/// calls for the current turn are complete, so that events arriving later in the same
/// <c>NewEvents</c> batch are included.
/// </summary>
internal void FinalizeCarryoverEvents()
{
if (!_preserveUnprocessedEvents || _externalEventBuffer.Count == 0)
return;

foreach (var action in _pendingActions.Values)
{
// all EventRaised events that were not consumed via WaitForExternalEventAsync
action.CompleteOrchestration.CarryoverEvents.AddRange(_externalEventBuffer);
if (action.CompleteOrchestration?.OrchestrationStatus == OrchestrationStatus.ContinuedAsNew)
{
action.CompleteOrchestration.CarryoverEvents.AddRange(_externalEventBuffer);
return;
}
}

_pendingActions.Add(action.Id, action);
}

/// <inheritdoc />
Expand Down
5 changes: 5 additions & 0 deletions src/Dapr.Workflow/Worker/WorkflowWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,11 @@ private async Task<OrchestratorResponse> HandleOrchestratorResponseAsync(Orchest
context.ProcessEvents(request.NewEvents, false);
}

// Populate CarryoverEvents now that all events in this turn have been processed.
// ContinueAsNew cannot do this inline because it runs mid-ProcessEvents; events
// arriving later in the same NewEvents batch would be buffered after the snapshot.
context.FinalizeCarryoverEvents();

// If the history processing caused a stall (e.g. via OnOrchestratorStarted), return immediately
if (versionTracker.IsStalled)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Testcontainers.Common;
using Dapr.Testcontainers.Harnesses;
using Dapr.Workflow;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Dapr.IntegrationTest.Workflow;

/// <summary>
/// Regression tests for the bug where events arriving in the same NewEvents batch as a
/// ContinueAsNew turn were silently dropped because CarryoverEvents was snapshotted
/// mid-ProcessEvents, before the later events in the batch had been buffered.
/// </summary>
public sealed class ContinueAsNewCarryoverEventsTests
{
private const string SignalEventName = "signal";

/// <summary>
/// Raises several same-name signals in parallel against a workflow that processes one
/// signal per ContinueAsNew iteration. When the sidecar batches the concurrent signals
/// into a single NewEvents delivery, the pre-fix code lost every signal after the first.
/// After the fix the full buffer is captured once all events are processed, so every
/// signal survives as a carryover event and the workflow counts down to zero.
/// </summary>
[Fact]
public async Task ContinueAsNew_ShouldCarryOverEvents_WhenMultipleSignalsArriveTogether()
{
const int signalCount = 250;
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(
needsActorState: true,
cancellationToken: TestContext.Current.CancellationToken);
await environment.StartAsync(TestContext.Current.CancellationToken);

var harness = new DaprHarnessBuilder(componentsDir)
.WithEnvironment(environment)
.BuildWorkflow();

await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
opt => opt.RegisterWorkflow<SignalCountdownWorkflow>(),
configureClient: (sp, cb) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
cb.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var client = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

// Start the workflow, which will wait for `signalCount` signals one per ContinueAsNew cycle.
await client.ScheduleNewWorkflowAsync(nameof(SignalCountdownWorkflow), workflowInstanceId, signalCount);

// Give the workflow a moment to register its first WaitForExternalEventAsync, then fire
// all signals simultaneously. The tight timing maximises the chance that the sidecar
// batches several of them into the same NewEvents, which is what triggers the bug.
await Task.Delay(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken);

await Task.WhenAll(
Enumerable.Range(0, signalCount)
.Select(_ => client.RaiseEventAsync(workflowInstanceId, SignalEventName, "tick",
TestContext.Current.CancellationToken)));

// All signals must be consumed via carryover. The workflow outputs 0 when done.
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(
TestContext.Current.CancellationToken);
timeoutCts.CancelAfter(TimeSpan.FromMinutes(2));

var result = await client.WaitForWorkflowCompletionAsync(
workflowInstanceId, cancellation: timeoutCts.Token);

Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);
Assert.Equal(0, result.ReadOutputAs<int>());
}

/// <summary>
/// Workflow that counts down from <c>remaining</c> to zero, consuming one "signal" external
/// event per ContinueAsNew iteration with <c>preserveUnprocessedEvents: true</c>.
/// </summary>
private sealed class SignalCountdownWorkflow : Workflow<int, int>
{
public override async Task<int> RunAsync(WorkflowContext context, int remaining)
{
if (remaining <= 0)
return 0;

await context.WaitForExternalEventAsync<string>(SignalEventName);
context.ContinueAsNew(remaining - 1, preserveUnprocessedEvents: true);
return remaining;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,10 @@ public void ContinueAsNew_ShouldAddCompleteOrchestrationAction_WithCarryoverEven

context.ProcessEvents(history, true);
context.ContinueAsNew(newInput: new { V = 9 }, preserveUnprocessedEvents: true);
// FinalizeCarryoverEvents must be called after all ProcessEvents calls are done;
// CarryoverEvents is populated here rather than inside ContinueAsNew so that events
// arriving later in the same NewEvents batch are not missed.
context.FinalizeCarryoverEvents();

Assert.Single(context.PendingActions);
var action = context.PendingActions.First();
Expand Down
Loading