Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/Dapr.Workflow/Worker/WorkflowWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,12 @@ private async Task<OrchestratorResponse> HandleOrchestratorResponseAsync(Orchest
: _serializer.Deserialize(serializedInput, workflow!.InputType);

// Initialize per-turn state before any workflow code runs.
context.InitializeNewTurn(currentTurnTimestamp);
// On first execution (no past events) use the current turn's OrchestratorStarted timestamp.
// On replay use the first past event's timestamp so the workflow sees the same
// CurrentUtcDateTime at its start as it did on the very first execution.
// ProcessEvents will advance the clock naturally via OrchestratorStarted history events.
var initialTimestamp = allPastEvents.Count > 0 ? currentUtcDateTime : currentTurnTimestamp;
context.InitializeNewTurn(initialTimestamp);
context.SetReplayState(allPastEvents.Count > 0);

// Execute the workflow
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// ------------------------------------------------------------------------
// Copyright 2026 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using Dapr.Testcontainers.Common;
using Dapr.Testcontainers.Harnesses;
using Dapr.Workflow;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Dapr.IntegrationTest.Workflow;

/// <summary>
/// Proves that <see cref="WorkflowContext.CurrentUtcDateTime"/> is deterministic and
/// monotonically non-decreasing across workflow replays.
///
/// <para>
/// https://github.com/dapr/dotnet-sdk/issues/1764
/// Root cause of the bug: on every replay the Dapr worker initialised
/// <c>_currentUtcDateTime</c> with the <em>current turn's</em>
/// <c>OrchestratorStarted</c> timestamp before running any workflow code.
/// Because the workflow code runs from the top on each replay, any read of
/// <c>CurrentUtcDateTime</c> before the first <c>await</c> would return the latest
/// turn's timestamp instead of the initial turn's timestamp — making the value
/// inconsistent (and often <em>later</em> than timestamps captured after the first
/// activity completed).
/// </para>
/// </summary>
public sealed class CurrentUtcDateTimeConsistencyTests
{
/// <summary>
/// Schedules a workflow that calls two sequential activities and captures
/// <c>CurrentUtcDateTime</c> at three checkpoints:
/// <list type="number">
/// <item>Before the first activity (before any <c>await</c>)</item>
/// <item>After the first activity</item>
/// <item>After the second activity</item>
/// </list>
/// Each activity introduces a small delay so that Dapr assigns a strictly later
/// <c>OrchestratorStarted</c> timestamp for the subsequent turn, making the
/// timestamps observably distinct.
///
/// Without the fix, checkpoint (1) returns the <em>third</em> turn's timestamp
/// while checkpoint (2) returns the <em>second</em> turn's timestamp, so the
/// sequence is not monotonically non-decreasing and the assertion fails.
/// </summary>
[Fact]
public async Task CurrentUtcDateTime_IsMonotonicallyNonDecreasing_AcrossReplays()
{
var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components");
var workflowInstanceId = Guid.NewGuid().ToString();

await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync(
needsActorState: true, cancellationToken: TestContext.Current.CancellationToken);
await environment.StartAsync(TestContext.Current.CancellationToken);

var harness = new DaprHarnessBuilder(componentsDir)
.WithEnvironment(environment)
.BuildWorkflow();
await using var testApp = await DaprHarnessBuilder.ForHarness(harness)
.ConfigureServices(builder =>
{
builder.Services.AddDaprWorkflowBuilder(
opt =>
{
opt.RegisterWorkflow<TimestampCaptureWorkflow>();
opt.RegisterActivity<DelayActivity>();
},
configureClient: (sp, cb) =>
{
var config = sp.GetRequiredService<IConfiguration>();
var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"];
if (!string.IsNullOrEmpty(grpcEndpoint))
cb.UseGrpcEndpoint(grpcEndpoint);
});
})
.BuildAndStartAsync();

using var scope = testApp.CreateScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();

await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(TimestampCaptureWorkflow), workflowInstanceId);
var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync(
workflowInstanceId, true, TestContext.Current.CancellationToken);

Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus);

var output = result.ReadOutputAs<TimestampOutput>();
Assert.NotNull(output);

// All three timestamps must be valid UTC.
Assert.Equal(DateTimeKind.Utc, output.BeforeFirstActivity.Kind);
Assert.Equal(DateTimeKind.Utc, output.AfterFirstActivity.Kind);
Assert.Equal(DateTimeKind.Utc, output.AfterSecondActivity.Kind);

// CurrentUtcDateTime must be monotonically non-decreasing across replays.
//
// Without the fix, BeforeFirstActivity would show the third turn's
// OrchestratorStarted timestamp (the most recent one when the workflow
// code ran on the final replay), which is later than AfterFirstActivity
// (second turn's timestamp). That would make the sequence:
// BeforeFirstActivity = T3 > AfterFirstActivity = T2 ← wrong
//
// With the fix, the workflow always starts with T1 (the initial turn's
// timestamp) regardless of which replay is currently executing:
// T1 ≤ T2 ≤ T3 ✓
Assert.True(
output.BeforeFirstActivity <= output.AfterFirstActivity,
$"CurrentUtcDateTime went backwards across a replay boundary: " +
$"before-first-activity={output.BeforeFirstActivity:O} is later than " +
$"after-first-activity={output.AfterFirstActivity:O}");

Assert.True(
output.AfterFirstActivity <= output.AfterSecondActivity,
$"CurrentUtcDateTime went backwards: " +
$"after-first-activity={output.AfterFirstActivity:O} is later than " +
$"after-second-activity={output.AfterSecondActivity:O}");
}

/// <summary>
/// The three <see cref="WorkflowContext.CurrentUtcDateTime"/> snapshots captured
/// by <see cref="TimestampCaptureWorkflow"/>.
/// </summary>
private sealed class TimestampOutput
{
public DateTime BeforeFirstActivity { get; init; }
public DateTime AfterFirstActivity { get; init; }
public DateTime AfterSecondActivity { get; init; }
}

/// <summary>
/// Activity that sleeps for <paramref name="input"/> milliseconds so that Dapr
/// records a later <c>OrchestratorStarted</c> timestamp for the next turn.
/// </summary>
private sealed class DelayActivity : WorkflowActivity<int, string>
{
public override async Task<string> RunAsync(WorkflowActivityContext context, int input)
{
await Task.Delay(input);
return "done";
}
}

/// <summary>
/// Calls two activities in sequence, capturing <see cref="WorkflowContext.CurrentUtcDateTime"/>
/// before the first activity and after each activity.
/// </summary>
private sealed class TimestampCaptureWorkflow : Workflow<object?, TimestampOutput>
{
public override async Task<TimestampOutput> RunAsync(WorkflowContext context, object? input)
{
// Read before any await. On replay this is the value that was wrong:
// the bug caused it to reflect the *current* turn's start time rather
// than the *initial* turn's start time.
var beforeFirstActivity = context.CurrentUtcDateTime;

// 100 ms delays ensure each activity completes in a measurably later
// Dapr turn, giving T1 < T2 < T3 with high confidence.
await context.CallActivityAsync<string>(nameof(DelayActivity), 100);
var afterFirstActivity = context.CurrentUtcDateTime;

await context.CallActivityAsync<string>(nameof(DelayActivity), 100);
var afterSecondActivity = context.CurrentUtcDateTime;

return new TimestampOutput
{
BeforeFirstActivity = beforeFirstActivity,
AfterFirstActivity = afterFirstActivity,
AfterSecondActivity = afterSecondActivity
};
}
}
}
103 changes: 103 additions & 0 deletions test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,109 @@ public async Task HandleOrchestratorResponseAsync_ShouldAdvanceCurrentUtcDateTim
Assert.Equal(string.Empty, complete.Result);
}

/// <summary>
/// Regression test: CurrentUtcDateTime must equal the workflow's initial start time before the first
/// await on every replay, not the current turn's timestamp.
///
/// The bug: WorkflowWorker initialised _currentUtcDateTime with the *current turn's*
/// OrchestratorStarted timestamp (T3) instead of the *first* history event's timestamp (T1).
/// The workflow code ran before ProcessEvents and read the wrong time.
/// </summary>
[Fact]
public async Task HandleOrchestratorResponseAsync_CurrentUtcDateTime_IsConsistentBeforeFirstAwait_OnReplay()
{
var sp = new ServiceCollection().BuildServiceProvider();
var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web));
var options = new WorkflowRuntimeOptions();

var t1 = new DateTime(2025, 01, 01, 12, 0, 0, DateTimeKind.Utc); // workflow started
var t2 = t1.AddSeconds(5); // activity completed
var t3 = t2.AddSeconds(5); // current turn start

DateTime capturedBeforeAwait = default;
DateTime capturedAfterActivityAwait = default;

var factory = new StubWorkflowsFactory();
factory.AddWorkflow("wf", new InlineWorkflow(
inputType: typeof(object),
run: async (ctx, _) =>
{
capturedBeforeAwait = ctx.CurrentUtcDateTime; // must equal T1 on every replay
await ctx.CallActivityAsync<string>("act");
capturedAfterActivityAwait = ctx.CurrentUtcDateTime; // must equal T2
return null;
}));
factory.AddActivity("act", new InlineActivity(
inputType: typeof(object),
run: (_, _) => Task.FromResult<object?>("result")));

var worker = new WorkflowWorker(
CreateGrpcClientMock().Object,
factory,
NullLoggerFactory.Instance,
serializer,
sp,
options);

// Simulate a replay turn: PastEvents contain the first turn's history (activity scheduled
// and completed), NewEvents hold the current turn's OrchestratorStarted at the later time T3.
// Before the fix, CurrentUtcDateTime before the first await would be T3, not T1.
var request = new OrchestratorRequest
{
InstanceId = "i",
PastEvents =
{
new HistoryEvent
{
Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(t1),
ExecutionStarted = new ExecutionStartedEvent { Name = "wf" }
},
new HistoryEvent
{
EventId = 0,
TaskScheduled = new TaskScheduledEvent { Name = "act" }
},
new HistoryEvent
{
Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(t2),
OrchestratorStarted = new OrchestratorStartedEvent()
},
new HistoryEvent
{
TaskCompleted = new TaskCompletedEvent
{
TaskScheduledId = 0,
Result = "\"result\""
}
}
},
NewEvents =
{
// Current turn starts at T3 — this is what the bug incorrectly used as
// the initial CurrentUtcDateTime before any workflow code ran.
new HistoryEvent
{
Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(t3),
OrchestratorStarted = new OrchestratorStartedEvent()
}
}
};

var response = await InvokeHandleOrchestratorResponseAsync(worker, request);

Assert.Equal("i", response.InstanceId);
var complete = response.Actions.Single(a => a.CompleteOrchestration != null).CompleteOrchestration!;
Assert.Equal(OrchestrationStatus.Completed, complete.OrchestrationStatus);

// Before the fix this was T3 (the current turn's timestamp). It must be T1 so that
// the value the workflow observes before its first await is consistent across replays.
Assert.Equal(t1, capturedBeforeAwait);

// After the activity completes the clock should have advanced to T2, as recorded
// by the OrchestratorStarted event that preceded the TaskCompleted event.
Assert.Equal(t2, capturedAfterActivityAwait);
}

[Fact]
public async Task HandleOrchestratorResponseAsync_ShouldCompleted_WhenEventReceived()
{
Expand Down
Loading