diff --git a/src/InProcessTestHost/InProcessTestHost.csproj b/src/InProcessTestHost/InProcessTestHost.csproj index 5114e3df5..a8f8194d8 100644 --- a/src/InProcessTestHost/InProcessTestHost.csproj +++ b/src/InProcessTestHost/InProcessTestHost.csproj @@ -5,7 +5,7 @@ Microsoft.DurableTask.Testing Microsoft.DurableTask.InProcessTestHost Microsoft.DurableTask.InProcessTestHost - 0.2.2-preview.1 + 0.2.3-preview.1 $(NoWarn);CA1848 diff --git a/src/InProcessTestHost/Sidecar/Dispatcher/TaskOrchestrationDispatcher.cs b/src/InProcessTestHost/Sidecar/Dispatcher/TaskOrchestrationDispatcher.cs index 3dfe627b4..cc2812a83 100644 --- a/src/InProcessTestHost/Sidecar/Dispatcher/TaskOrchestrationDispatcher.cs +++ b/src/InProcessTestHost/Sidecar/Dispatcher/TaskOrchestrationDispatcher.cs @@ -111,12 +111,12 @@ protected override async Task ExecuteWorkItemAsync(TaskOrchestrationWorkItem wor { throw new ArgumentException($"Could not find an orchestration instance ID in the work item's runtime state.", nameof(workItem)); } - - var activityMessages = new List(); - var timerMessages = new List(); - var orchestratorMessages = new List(); - - // We loop for as long as the orchestrator does a ContinueAsNew + + var activityMessages = new List(); + var timerMessages = new List(); + var orchestratorMessages = new List(); + + // We loop for as long as the orchestrator does a ContinueAsNew while (true) { if (this.log.IsEnabled(LogLevel.Debug)) @@ -149,6 +149,16 @@ protected override async Task ExecuteWorkItemAsync(TaskOrchestrationWorkItem wor out bool continueAsNew); if (continueAsNew) { + // The previous execution is being replaced by a new one. Clear any + // accumulated messages from the old execution so they are not + // re-enqueued when the work item completes. Without this, stale + // activity/timer/orchestrator messages from prior iterations would + // be committed alongside the new execution, causing duplicate + // activities, stale timer fires, and ultimately stuck instances. + activityMessages.Clear(); + timerMessages.Clear(); + orchestratorMessages.Clear(); + // Continue running the orchestration with a new history. // Renew the lock if we're getting close to its expiration. if (workItem.LockedUntilUtc != default && DateTime.UtcNow.AddMinutes(1) > workItem.LockedUntilUtc) @@ -365,11 +375,11 @@ void ApplyOrchestratorActions( Input = sendEventAction.EventData, }; - runtimeState.AddEvent(sendEvent); - - EventRaisedEvent eventRaisedEvent = new(-1, sendEventAction.EventData) - { - Name = sendEventAction.EventName + runtimeState.AddEvent(sendEvent); + + EventRaisedEvent eventRaisedEvent = new(-1, sendEventAction.EventData) + { + Name = sendEventAction.EventName }; orchestratorMessages.Add(new TaskMessage diff --git a/src/InProcessTestHost/Sidecar/Dispatcher/WorkItemDispatcher.cs b/src/InProcessTestHost/Sidecar/Dispatcher/WorkItemDispatcher.cs index d026bc3bd..4a5acf794 100644 --- a/src/InProcessTestHost/Sidecar/Dispatcher/WorkItemDispatcher.cs +++ b/src/InProcessTestHost/Sidecar/Dispatcher/WorkItemDispatcher.cs @@ -156,10 +156,10 @@ async Task WaitForAllClear(CancellationToken cancellationToken) { TimeSpan logInterval = TimeSpan.FromMinutes(1); - // IMPORTANT: This logic assumes only a single logical "thread" is executing the receive loop, - // and that there's no possible race condition when comparing work-item counts. + // IMPORTANT: The receive loop is expected to have a single logical execution path, but the + // work-item count can still change concurrently and must be read using thread-safe access. DateTime nextLogTime = DateTime.MinValue; - while (this.currentWorkItems >= this.MaxWorkItems) + while (Volatile.Read(ref this.currentWorkItems) >= this.MaxWorkItems) { // Periodically log that we're waiting for available concurrency. // No need to use UTC for this. Local time is a bit easier to debug. @@ -169,7 +169,7 @@ async Task WaitForAllClear(CancellationToken cancellationToken) this.log.FetchingThrottled( dispatcher: this.name, details: "The current active work-item count has reached the allowed maximum.", - this.currentWorkItems, + Volatile.Read(ref this.currentWorkItems), this.MaxWorkItems); nextLogTime = now.Add(logInterval); } @@ -192,14 +192,14 @@ async Task WaitForAllClear(CancellationToken cancellationToken) async Task WaitForOutstandingWorkItems(CancellationToken cancellationToken) { DateTime nextLogTime = DateTime.MinValue; - while (this.currentWorkItems > 0) + while (Volatile.Read(ref this.currentWorkItems) > 0) { // Periodically log that we're waiting for outstanding work items to complete. // No need to use UTC for this. Local time is a bit easier to debug. DateTime now = DateTime.Now; if (now >= nextLogTime) { - this.log.DispatcherStopping(this.name, this.currentWorkItems); + this.log.DispatcherStopping(this.name, Volatile.Read(ref this.currentWorkItems)); nextLogTime = now.AddMinutes(1); } @@ -265,7 +265,7 @@ async Task FetchAndExecuteLoop(CancellationToken cancellationToken) if (workItem != null) { - this.currentWorkItems++; + Interlocked.Increment(ref this.currentWorkItems); this.log.FetchWorkItemCompleted( this.name, this.GetWorkItemId(workItem), @@ -334,7 +334,7 @@ async Task ExecuteWorkItem(T workItem) details: ex.ToString()); } - this.currentWorkItems--; + Interlocked.Decrement(ref this.currentWorkItems); } } } diff --git a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs index 81e37311d..e4f9c027b 100644 --- a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs +++ b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs @@ -653,7 +653,7 @@ public void AddMessage(TaskMessage message) SerializedInstanceState state = this.store.GetOrAdd(instanceId, id => new SerializedInstanceState(id, executionId)); lock (state) { - bool isRestart = state.ExecutionId != null && state.ExecutionId != executionId; + bool isRestart = executionId != null && state.ExecutionId != null && state.ExecutionId != executionId; if (message.Event is ExecutionStartedEvent startEvent) { @@ -684,11 +684,14 @@ public void AddMessage(TaskMessage message) else if (state.IsCompleted) { // Drop the message since we're completed - // GOOD: The user-provided the instanceId - // logger.LogWarning( - // "Dropped {eventType} message for instance '{instanceId}' because the orchestration has already completed.", - // message.Event.EventType, - // instanceId); + return; + } + else if (isRestart) + { + // Drop messages that belong to a previous execution (stale activity + // completions, timer fires, etc.). These can arrive when a + // ContinueAsNew created a new execution but work items from the + // old execution are still in flight. return; } diff --git a/test/InProcessTestHost.Tests/ContinueAsNewTests.cs b/test/InProcessTestHost.Tests/ContinueAsNewTests.cs index c5805643a..19d7934dc 100644 --- a/test/InProcessTestHost.Tests/ContinueAsNewTests.cs +++ b/test/InProcessTestHost.Tests/ContinueAsNewTests.cs @@ -254,3 +254,253 @@ public void Update(AsyncOperation result) #endregion } + +/// +/// Tests targeting the ContinueAsNew race condition fix where stale messages from +/// prior ContinueAsNew iterations (activities, timers) could corrupt the new execution's +/// message queue or cause instances to get permanently stuck. +/// +public class ContinueAsNewRaceConditionTests +{ + readonly ITestOutputHelper output; + + public ContinueAsNewRaceConditionTests(ITestOutputHelper output) + { + this.output = output; + } + + /// + /// Reproduces the original stuck-instance scenario: 4+ orchestrations running + /// concurrently, each calling an activity, waiting on a timer, then ContinueAsNew, + /// repeated for multiple iterations. Without the fix, some instances would stop + /// making progress after 2-3 ContinueAsNew iterations because stale activity/timer + /// messages from prior iterations would corrupt the message queue. + /// + [Fact(Timeout = 120_000)] + public async Task ConcurrentContinueAsNew_MultipleIterations_NoneGetStuck() + { + const int instanceCount = 6; + const int targetIterations = 5; + + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks => + { + tasks.AddOrchestratorFunc("IteratingOrchestrator", async (context, iteration) => + { + // Call activity + await context.CallActivityAsync("NoOpActivity", iteration); + + // Wait on timer (short delay to exercise timer message path) + await context.CreateTimer(TimeSpan.FromMilliseconds(500), CancellationToken.None); + + if (iteration < targetIterations) + { + context.ContinueAsNew(iteration + 1); + return -1; // unreachable + } + + return iteration; + }); + + tasks.AddActivityFunc("NoOpActivity", (context, iteration) => + { + return $"done-{iteration}"; + }); + }); + + // Schedule all instances concurrently + string[] instanceIds = new string[instanceCount]; + for (int i = 0; i < instanceCount; i++) + { + instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync( + "IteratingOrchestrator", 1); + } + + this.output.WriteLine($"Scheduled {instanceCount} orchestrations, each targeting {targetIterations} iterations"); + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(90)); + + Task[] waitTasks = instanceIds + .Select(id => host.Client.WaitForInstanceCompletionAsync( + id, getInputsAndOutputs: true, cts.Token)) + .ToArray(); + + OrchestrationMetadata[] results = await Task.WhenAll(waitTasks); + + for (int i = 0; i < instanceCount; i++) + { + Assert.NotNull(results[i]); + Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus); + int output = results[i].ReadOutputAs(); + Assert.Equal(targetIterations, output); + this.output.WriteLine($"Instance[{i}] ({instanceIds[i]}): completed at iteration {output}"); + } + } + + /// + /// Tests that an orchestration performing many ContinueAsNew iterations with + /// both activities and timers on each iteration completes correctly. This exercises + /// the fix that clears accumulated message lists between iterations — without it, + /// stale timer/activity messages would accumulate and cause duplicate scheduling. + /// + [Fact(Timeout = 60_000)] + public async Task SingleInstance_ManyContinueAsNewIterations_CompletesCorrectly() + { + const int targetIterations = 8; + + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks => + { + tasks.AddOrchestratorFunc("MultiIterationOrchestrator", async (context, iteration) => + { + // Each iteration: activity + timer + ContinueAsNew + await context.CallActivityAsync("EchoActivity", $"iter-{iteration}"); + await context.CreateTimer(TimeSpan.FromMilliseconds(100), CancellationToken.None); + + if (iteration < targetIterations) + { + context.ContinueAsNew(iteration + 1); + return -1; + } + + return iteration; + }); + + tasks.AddActivityFunc("EchoActivity", (context, input) => $"echo:{input}"); + }); + + string instanceId = await host.Client.ScheduleNewOrchestrationInstanceAsync( + "MultiIterationOrchestrator", 1); + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30)); + OrchestrationMetadata metadata = await host.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, cts.Token); + + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(targetIterations, metadata.ReadOutputAs()); + } + + /// + /// Verifies that ContinueAsNew works correctly when orchestrations call multiple + /// activities per iteration. This amplifies the stale-message problem because each + /// iteration generates more activity messages that must be properly discarded. + /// + [Fact(Timeout = 60_000)] + public async Task ContinueAsNew_MultipleActivitiesPerIteration_AllComplete() + { + const int instanceCount = 4; + const int activitiesPerIteration = 3; + const int targetIterations = 4; + + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks => + { + tasks.AddOrchestratorFunc("MultiActivityOrchestrator", async (context, iteration) => + { + // Call multiple activities per iteration + List> activityTasks = new(); + for (int i = 0; i < activitiesPerIteration; i++) + { + activityTasks.Add(context.CallActivityAsync( + "IndexedActivity", $"iter{iteration}-act{i}")); + } + + await Task.WhenAll(activityTasks); + await context.CreateTimer(TimeSpan.FromMilliseconds(200), CancellationToken.None); + + if (iteration < targetIterations) + { + context.ContinueAsNew(iteration + 1); + return -1; + } + + return iteration; + }); + + tasks.AddActivityFunc("IndexedActivity", (context, input) => $"result:{input}"); + }); + + string[] instanceIds = new string[instanceCount]; + for (int i = 0; i < instanceCount; i++) + { + instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync( + "MultiActivityOrchestrator", 1); + } + + this.output.WriteLine($"Scheduled {instanceCount} orchestrations with {activitiesPerIteration} activities per iteration"); + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(45)); + Task[] waitTasks = instanceIds + .Select(id => host.Client.WaitForInstanceCompletionAsync( + id, getInputsAndOutputs: true, cts.Token)) + .ToArray(); + + OrchestrationMetadata[] results = await Task.WhenAll(waitTasks); + + for (int i = 0; i < instanceCount; i++) + { + Assert.NotNull(results[i]); + Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus); + Assert.Equal(targetIterations, results[i].ReadOutputAs()); + } + } + + /// + /// Runs the reproduction scenario from the original bug report repeatedly to ensure + /// reliability: 4+ concurrent instances, each doing activity + 5s timer + ContinueAsNew, + /// across multiple independent rounds with fresh hosts. + /// + [Fact(Timeout = 120_000)] + public async Task ReproScenario_RepeatedRounds_AllComplete() + { + const int instanceCount = 4; + const int rounds = 3; + const int targetIterations = 3; + + for (int round = 0; round < rounds; round++) + { + this.output.WriteLine($"=== Round {round + 1}/{rounds} ==="); + + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(tasks => + { + tasks.AddOrchestratorFunc("ReproOrchestrator", async (context, iteration) => + { + await context.CallActivityAsync("ReproActivity", iteration); + await context.CreateTimer(TimeSpan.FromSeconds(5), CancellationToken.None); + + if (iteration < targetIterations) + { + context.ContinueAsNew(iteration + 1); + return -1; + } + + return iteration; + }); + + tasks.AddActivityFunc("ReproActivity", (context, input) => $"ok-{input}"); + }); + + string[] instanceIds = new string[instanceCount]; + for (int i = 0; i < instanceCount; i++) + { + instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync( + "ReproOrchestrator", 1); + } + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(45)); + Task[] waitTasks = instanceIds + .Select(id => host.Client.WaitForInstanceCompletionAsync( + id, getInputsAndOutputs: true, cts.Token)) + .ToArray(); + + OrchestrationMetadata[] results = await Task.WhenAll(waitTasks); + + for (int i = 0; i < instanceCount; i++) + { + Assert.NotNull(results[i]); + Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus); + Assert.Equal(targetIterations, results[i].ReadOutputAs()); + } + + this.output.WriteLine($"Round {round + 1}: all {instanceCount} completed"); + } + } +}