diff --git a/src/Dapr.Workflow/Worker/WorkflowWorker.cs b/src/Dapr.Workflow/Worker/WorkflowWorker.cs index 19a72a614..c3ec7af7c 100644 --- a/src/Dapr.Workflow/Worker/WorkflowWorker.cs +++ b/src/Dapr.Workflow/Worker/WorkflowWorker.cs @@ -147,9 +147,6 @@ private async Task HandleOrchestratorResponseAsync(Orchest object? input = string.IsNullOrEmpty(serializedInput) ? null : _serializer.Deserialize(serializedInput, workflow!.InputType); - - // Replay the old history to rebuild the local state of the orchestration. - context.ProcessEvents(allPastEvents, true); // Execute the workflow // IMPORTANT: Durable orchestrations intentionally "block" on incomplete tasks (activities, timers, events) @@ -157,7 +154,10 @@ private async Task HandleOrchestratorResponseAsync(Orchest // We run the workflow BEFORE processing new events so that the code reaches the 'await' points and registers tasks // in _openTasks. Then, ProcessEvents(NewEvents) can satisfy those tasks. var runTask = workflow!.RunAsync(context, input); - + + // Replay the old history to rebuild the local state of the orchestration. + context.ProcessEvents(allPastEvents, true); + // Play the newly arrived events to determine the next action to take. context.ProcessEvents(request.NewEvents, false); diff --git a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs index c8c23cbd7..d5bfde61d 100644 --- a/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs +++ b/test/Dapr.Workflow.Test/Worker/Internal/WorkflowOrchestrationContextTests.cs @@ -469,7 +469,7 @@ public async Task WaitForExternalEventAsync_ShouldReturnDeserializedValue_WhenEv } [Fact] - public async Task WaitForExternalEventAsync_ShouldReturnUncompletedTask_WhenEventInHistory() + public async Task WaitForExternalEventAsync_ShouldReturnCompletedTask_WhenEventInHistory() { var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); @@ -518,7 +518,7 @@ public async Task WaitForExternalEventAsync_ShouldReturnUncompletedTask_WhenEven } [Fact] - public async Task WaitForExternalEventAsync_WithTimeoutOverload_ShouldCancel_WhenEventReceived() + public async Task WaitForExternalEventAsync_WithTimeoutOverload_ShouldReturnCompletedTask_WhenEventReceived() { var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); diff --git a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs index 33e9338b9..e509abcbf 100644 --- a/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs +++ b/test/Dapr.Workflow.Test/Worker/WorkflowWorkerTests.cs @@ -910,6 +910,246 @@ public async Task HandleOrchestratorResponseAsync_ShouldUseFirstEventTimestamp_W Assert.Equal(string.Empty, complete.Result); } + [Fact] + public async Task HandleOrchestratorResponseAsync_ShouldAdvanceCurrentUtcDateTime_WhenTimerFires() + { + var sp = new ServiceCollection().BuildServiceProvider(); + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var options = new WorkflowRuntimeOptions(); + var beginDateTime = new DateTime(2025, 01, 01, 12, 0, 0, DateTimeKind.Utc); + + var factory = new StubWorkflowsFactory(); + factory.AddWorkflow("wf", new InlineWorkflow( + inputType: typeof(int), + run: async (ctx, input) => + { + Assert.Equal(beginDateTime, ctx.CurrentUtcDateTime); + await ctx.CreateTimer(TimeSpan.FromSeconds(5)); + Assert.Equal(beginDateTime.AddSeconds(5), ctx.CurrentUtcDateTime); + + return null; + })); + + var worker = new WorkflowWorker( + CreateGrpcClientMock().Object, + factory, + NullLoggerFactory.Instance, + serializer, + sp, + options); + + var request = new OrchestratorRequest + { + InstanceId = "i", + PastEvents = + { + new HistoryEvent + { + Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime), + ExecutionStarted = new ExecutionStartedEvent + { + Name = "wf", + Input = "123" + } + }, + new HistoryEvent + { + EventId = 0, + TimerCreated = new TimerCreatedEvent + { + FireAt = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime.AddSeconds(5)) + } + }, + new HistoryEvent + { + Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime.AddSeconds(5)), + OrchestratorStarted = new OrchestratorStartedEvent() + }, + new HistoryEvent + { + TimerFired = new TimerFiredEvent + { + TimerId = 0, + FireAt = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime.AddSeconds(5)) + } + } + } + }; + + var response = await InvokeHandleOrchestratorResponseAsync(worker, request); + + Assert.Equal("i", response.InstanceId); + var complete = response.Actions.Single(a => a.CompleteOrchestration != null).CompleteOrchestration!; + Assert.Equal(OrchestrationStatus.Completed, complete.OrchestrationStatus); + Assert.Equal(string.Empty, complete.Result); + } + + [Fact] + public async Task HandleOrchestratorResponseAsync_ShouldCompleted_WhenEventReceived() + { + var sp = new ServiceCollection().BuildServiceProvider(); + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var options = new WorkflowRuntimeOptions(); + var beginDateTime = new DateTime(2025, 01, 01, 12, 0, 0, DateTimeKind.Utc); + + var factory = new StubWorkflowsFactory(); + factory.AddWorkflow("wf", new InlineWorkflow( + inputType: typeof(int), + run: async (ctx, input) => + { + await ctx.WaitForExternalEventAsync("MyEvent", TimeSpan.FromSeconds(5)); + return null; + })); + + var worker = new WorkflowWorker( + CreateGrpcClientMock().Object, + factory, + NullLoggerFactory.Instance, + serializer, + sp, + options); + + var request = new OrchestratorRequest + { + InstanceId = "i", + PastEvents = + { + new HistoryEvent + { + Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime), + ExecutionStarted = new ExecutionStartedEvent + { + Name = "wf", + Input = "123" + } + }, + new HistoryEvent + { + EventId = 0, + TimerCreated = new TimerCreatedEvent + { + FireAt = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime.AddSeconds(5)) + } + }, + new HistoryEvent + { + Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime.AddSeconds(2)), + OrchestratorStarted = new OrchestratorStartedEvent() + }, + new HistoryEvent + { + EventRaised = new EventRaisedEvent + { + Name = "myevent" + } + }, + new HistoryEvent + { + Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime.AddSeconds(5)), + OrchestratorStarted = new OrchestratorStartedEvent() + }, + new HistoryEvent + { + TimerFired = new TimerFiredEvent + { + TimerId = 0, + FireAt = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime.AddSeconds(5)) + } + } + } + }; + + var response = await InvokeHandleOrchestratorResponseAsync(worker, request); + + Assert.Equal("i", response.InstanceId); + var complete = response.Actions.Single(a => a.CompleteOrchestration != null).CompleteOrchestration!; + Assert.Equal(OrchestrationStatus.Completed, complete.OrchestrationStatus); + Assert.Equal(string.Empty, complete.Result); + } + + [Fact] + public async Task HandleOrchestratorResponseAsync_ShouldReturnFailureDetails_WhenTimerFires() + { + var sp = new ServiceCollection().BuildServiceProvider(); + var serializer = new JsonWorkflowSerializer(new JsonSerializerOptions(JsonSerializerDefaults.Web)); + var options = new WorkflowRuntimeOptions(); + var beginDateTime = new DateTime(2025, 01, 01, 12, 0, 0, DateTimeKind.Utc); + + var factory = new StubWorkflowsFactory(); + factory.AddWorkflow("wf", new InlineWorkflow( + inputType: typeof(int), + run: async (ctx, input) => + { + await ctx.WaitForExternalEventAsync("MyEvent", TimeSpan.FromSeconds(5)); + return null; + })); + + var worker = new WorkflowWorker( + CreateGrpcClientMock().Object, + factory, + NullLoggerFactory.Instance, + serializer, + sp, + options); + + var request = new OrchestratorRequest + { + InstanceId = "i", + PastEvents = + { + new HistoryEvent + { + Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime), + ExecutionStarted = new ExecutionStartedEvent + { + Name = "wf", + Input = "123" + } + }, + new HistoryEvent + { + EventId = 0, + TimerCreated = new TimerCreatedEvent + { + FireAt = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime.AddSeconds(5)) + } + }, + new HistoryEvent + { + Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime.AddSeconds(5)), + OrchestratorStarted = new OrchestratorStartedEvent() + }, + new HistoryEvent + { + TimerFired = new TimerFiredEvent + { + TimerId = 0, + FireAt = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime.AddSeconds(5)) + } + }, + new HistoryEvent + { + Timestamp = Google.Protobuf.WellKnownTypes.Timestamp.FromDateTime(beginDateTime.AddSeconds(10)), + OrchestratorStarted = new OrchestratorStartedEvent() + }, + new HistoryEvent + { + EventRaised = new EventRaisedEvent + { + Name = "myevent" + } + } + } + }; + + var response = await InvokeHandleOrchestratorResponseAsync(worker, request); + + Assert.Equal("i", response.InstanceId); + var complete = response.Actions.Single(a => a.CompleteOrchestration != null).CompleteOrchestration!; + Assert.Equal(OrchestrationStatus.Failed, complete.OrchestrationStatus); + Assert.NotNull(complete.FailureDetails); + } + [Fact] public async Task HandleActivityResponseAsync_ShouldUseEmptyInstanceId_WhenOrchestrationInstanceIsNull_AndReturnEmptyResult_WhenOutputIsNull() {