diff --git a/src/Dapr.Workflow/Worker/WorkflowWorker.cs b/src/Dapr.Workflow/Worker/WorkflowWorker.cs index 5069ee1c5..8ba7c17fb 100644 --- a/src/Dapr.Workflow/Worker/WorkflowWorker.cs +++ b/src/Dapr.Workflow/Worker/WorkflowWorker.cs @@ -318,7 +318,12 @@ private async Task HandleOrchestratorResponseAsync(Orchest : _serializer.Deserialize(serializedInput, workflow!.InputType); // Initialize per-turn state before any workflow code runs. - context.InitializeNewTurn(currentTurnTimestamp); + // On first execution (no past events) use the current turn's OrchestratorStarted timestamp. + // On replay use the first past event's timestamp so the workflow sees the same + // CurrentUtcDateTime at its start as it did on the very first execution. + // ProcessEvents will advance the clock naturally via OrchestratorStarted history events. + var initialTimestamp = allPastEvents.Count > 0 ? currentUtcDateTime : currentTurnTimestamp; + context.InitializeNewTurn(initialTimestamp); context.SetReplayState(allPastEvents.Count > 0); // Execute the workflow diff --git a/test/Dapr.IntegrationTest.Workflow/CurrentUtcDateTimeConsistencyTests.cs b/test/Dapr.IntegrationTest.Workflow/CurrentUtcDateTimeConsistencyTests.cs new file mode 100644 index 000000000..9ca126d48 --- /dev/null +++ b/test/Dapr.IntegrationTest.Workflow/CurrentUtcDateTimeConsistencyTests.cs @@ -0,0 +1,182 @@ +// ------------------------------------------------------------------------ +// Copyright 2026 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using Dapr.Testcontainers.Common; +using Dapr.Testcontainers.Harnesses; +using Dapr.Workflow; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; + +namespace Dapr.IntegrationTest.Workflow; + +/// +/// Proves that is deterministic and +/// monotonically non-decreasing across workflow replays. +/// +/// +/// https://github.com/dapr/dotnet-sdk/issues/1764 +/// Root cause of the bug: on every replay the Dapr worker initialised +/// _currentUtcDateTime with the current turn's +/// OrchestratorStarted timestamp before running any workflow code. +/// Because the workflow code runs from the top on each replay, any read of +/// CurrentUtcDateTime before the first await would return the latest +/// turn's timestamp instead of the initial turn's timestamp — making the value +/// inconsistent (and often later than timestamps captured after the first +/// activity completed). +/// +/// +public sealed class CurrentUtcDateTimeConsistencyTests +{ + /// + /// Schedules a workflow that calls two sequential activities and captures + /// CurrentUtcDateTime at three checkpoints: + /// + /// Before the first activity (before any await) + /// After the first activity + /// After the second activity + /// + /// Each activity introduces a small delay so that Dapr assigns a strictly later + /// OrchestratorStarted timestamp for the subsequent turn, making the + /// timestamps observably distinct. + /// + /// Without the fix, checkpoint (1) returns the third turn's timestamp + /// while checkpoint (2) returns the second turn's timestamp, so the + /// sequence is not monotonically non-decreasing and the assertion fails. + /// + [Fact] + public async Task CurrentUtcDateTime_IsMonotonicallyNonDecreasing_AcrossReplays() + { + var componentsDir = TestDirectoryManager.CreateTestDirectory("workflow-components"); + var workflowInstanceId = Guid.NewGuid().ToString(); + + await using var environment = await DaprTestEnvironment.CreateWithPooledNetworkAsync( + needsActorState: true, cancellationToken: TestContext.Current.CancellationToken); + await environment.StartAsync(TestContext.Current.CancellationToken); + + var harness = new DaprHarnessBuilder(componentsDir) + .WithEnvironment(environment) + .BuildWorkflow(); + await using var testApp = await DaprHarnessBuilder.ForHarness(harness) + .ConfigureServices(builder => + { + builder.Services.AddDaprWorkflowBuilder( + opt => + { + opt.RegisterWorkflow(); + opt.RegisterActivity(); + }, + configureClient: (sp, cb) => + { + var config = sp.GetRequiredService(); + var grpcEndpoint = config["DAPR_GRPC_ENDPOINT"]; + if (!string.IsNullOrEmpty(grpcEndpoint)) + cb.UseGrpcEndpoint(grpcEndpoint); + }); + }) + .BuildAndStartAsync(); + + using var scope = testApp.CreateScope(); + var daprWorkflowClient = scope.ServiceProvider.GetRequiredService(); + + await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(TimestampCaptureWorkflow), workflowInstanceId); + var result = await daprWorkflowClient.WaitForWorkflowCompletionAsync( + workflowInstanceId, true, TestContext.Current.CancellationToken); + + Assert.Equal(WorkflowRuntimeStatus.Completed, result.RuntimeStatus); + + var output = result.ReadOutputAs(); + Assert.NotNull(output); + + // All three timestamps must be valid UTC. + Assert.Equal(DateTimeKind.Utc, output.BeforeFirstActivity.Kind); + Assert.Equal(DateTimeKind.Utc, output.AfterFirstActivity.Kind); + Assert.Equal(DateTimeKind.Utc, output.AfterSecondActivity.Kind); + + // CurrentUtcDateTime must be monotonically non-decreasing across replays. + // + // Without the fix, BeforeFirstActivity would show the third turn's + // OrchestratorStarted timestamp (the most recent one when the workflow + // code ran on the final replay), which is later than AfterFirstActivity + // (second turn's timestamp). That would make the sequence: + // BeforeFirstActivity = T3 > AfterFirstActivity = T2 ← wrong + // + // With the fix, the workflow always starts with T1 (the initial turn's + // timestamp) regardless of which replay is currently executing: + // T1 ≤ T2 ≤ T3 ✓ + Assert.True( + output.BeforeFirstActivity <= output.AfterFirstActivity, + $"CurrentUtcDateTime went backwards across a replay boundary: " + + $"before-first-activity={output.BeforeFirstActivity:O} is later than " + + $"after-first-activity={output.AfterFirstActivity:O}"); + + Assert.True( + output.AfterFirstActivity <= output.AfterSecondActivity, + $"CurrentUtcDateTime went backwards: " + + $"after-first-activity={output.AfterFirstActivity:O} is later than " + + $"after-second-activity={output.AfterSecondActivity:O}"); + } + + /// + /// The three snapshots captured + /// by . + /// + private sealed class TimestampOutput + { + public DateTime BeforeFirstActivity { get; init; } + public DateTime AfterFirstActivity { get; init; } + public DateTime AfterSecondActivity { get; init; } + } + + /// + /// Activity that sleeps for milliseconds so that Dapr + /// records a later OrchestratorStarted timestamp for the next turn. + /// + private sealed class DelayActivity : WorkflowActivity + { + public override async Task RunAsync(WorkflowActivityContext context, int input) + { + await Task.Delay(input); + return "done"; + } + } + + /// + /// Calls two activities in sequence, capturing + /// before the first activity and after each activity. + /// + private sealed class TimestampCaptureWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, object? input) + { + // Read before any await. On replay this is the value that was wrong: + // the bug caused it to reflect the *current* turn's start time rather + // than the *initial* turn's start time. + var beforeFirstActivity = context.CurrentUtcDateTime; + + // 100 ms delays ensure each activity completes in a measurably later + // Dapr turn, giving T1 < T2 < T3 with high confidence. + await context.CallActivityAsync(nameof(DelayActivity), 100); + var afterFirstActivity = context.CurrentUtcDateTime; + + await context.CallActivityAsync(nameof(DelayActivity), 100); + var afterSecondActivity = context.CurrentUtcDateTime; + + return new TimestampOutput + { + BeforeFirstActivity = beforeFirstActivity, + AfterFirstActivity = afterFirstActivity, + AfterSecondActivity = afterSecondActivity + }; + } + } +} diff --git a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs index 067ef830d..02f668aaf 100644 --- a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs +++ b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs @@ -1248,6 +1248,109 @@ public async Task HandleOrchestratorResponseAsync_ShouldAdvanceCurrentUtcDateTim Assert.Equal(string.Empty, complete.Result); } + /// + /// Regression test: CurrentUtcDateTime must equal the workflow's initial start time before the first + /// await on every replay, not the current turn's timestamp. + /// + /// The bug: WorkflowWorker initialised _currentUtcDateTime with the *current turn's* + /// OrchestratorStarted timestamp (T3) instead of the *first* history event's timestamp (T1). + /// The workflow code ran before ProcessEvents and read the wrong time. + /// + [Fact] + public async Task HandleOrchestratorResponseAsync_CurrentUtcDateTime_IsConsistentBeforeFirstAwait_OnReplay() + { + var sp = new ServiceCollection().BuildServiceProvider(); + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var options = new WorkflowRuntimeOptions(); + + var t1 = new DateTime(2025, 01, 01, 12, 0, 0, DateTimeKind.Utc); // workflow started + var t2 = t1.AddSeconds(5); // activity completed + var t3 = t2.AddSeconds(5); // current turn start + + DateTime capturedBeforeAwait = default; + DateTime capturedAfterActivityAwait = default; + + var factory = new StubWorkflowsFactory(); + factory.AddWorkflow("wf", new InlineWorkflow( + inputType: typeof(object), + run: async (ctx, _) => + { + capturedBeforeAwait = ctx.CurrentUtcDateTime; // must equal T1 on every replay + await ctx.CallActivityAsync("act"); + capturedAfterActivityAwait = ctx.CurrentUtcDateTime; // must equal T2 + return null; + })); + factory.AddActivity("act", new InlineActivity( + inputType: typeof(object), + run: (_, _) => Task.FromResult("result"))); + + var worker = new WorkflowWorker( + CreateGrpcClientMock().Object, + factory, + NullLoggerFactory.Instance, + serializer, + sp, + options); + + // Simulate a replay turn: PastEvents contain the first turn's history (activity scheduled + // and completed), NewEvents hold the current turn's OrchestratorStarted at the later time T3. + // Before the fix, CurrentUtcDateTime before the first await would be T3, not T1. + var request = new OrchestratorRequest + { + InstanceId = "i", + PastEvents = + { + new HistoryEvent + { + Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(t1), + ExecutionStarted = new ExecutionStartedEvent { Name = "wf" } + }, + new HistoryEvent + { + EventId = 0, + TaskScheduled = new TaskScheduledEvent { Name = "act" } + }, + new HistoryEvent + { + Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(t2), + OrchestratorStarted = new OrchestratorStartedEvent() + }, + new HistoryEvent + { + TaskCompleted = new TaskCompletedEvent + { + TaskScheduledId = 0, + Result = "\"result\"" + } + } + }, + NewEvents = + { + // Current turn starts at T3 — this is what the bug incorrectly used as + // the initial CurrentUtcDateTime before any workflow code ran. + new HistoryEvent + { + Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(t3), + OrchestratorStarted = new OrchestratorStartedEvent() + } + } + }; + + var response = await InvokeHandleOrchestratorResponseAsync(worker, request); + + Assert.Equal("i", response.InstanceId); + var complete = response.Actions.Single(a => a.CompleteOrchestration != null).CompleteOrchestration!; + Assert.Equal(OrchestrationStatus.Completed, complete.OrchestrationStatus); + + // Before the fix this was T3 (the current turn's timestamp). It must be T1 so that + // the value the workflow observes before its first await is consistent across replays. + Assert.Equal(t1, capturedBeforeAwait); + + // After the activity completes the clock should have advanced to T2, as recorded + // by the OrchestratorStarted event that preceded the TaskCompleted event. + Assert.Equal(t2, capturedAfterActivityAwait); + } + [Fact] public async Task HandleOrchestratorResponseAsync_ShouldCompleted_WhenEventReceived() {