diff --git a/src/InProcessTestHost/InProcessTestHost.csproj b/src/InProcessTestHost/InProcessTestHost.csproj index 9cdc750c0..5114e3df5 100644 --- a/src/InProcessTestHost/InProcessTestHost.csproj +++ b/src/InProcessTestHost/InProcessTestHost.csproj @@ -5,7 +5,7 @@ Microsoft.DurableTask.Testing Microsoft.DurableTask.InProcessTestHost Microsoft.DurableTask.InProcessTestHost - 0.2.1-preview.1 + 0.2.2-preview.1 $(NoWarn);CA1848 diff --git a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs index 50f1fe155..28214435d 100644 --- a/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs +++ b/src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs @@ -815,7 +815,8 @@ async Task ITaskExecutor.ExecuteOrchestrator( totalBytes += ev.CalculateSize(); } - if (this.supportsHistoryStreaming && totalBytes > (1024)) + const int HistoryStreamingThresholdBytes = 1024 * 1024; // 1 MiB + if (this.supportsHistoryStreaming && totalBytes > HistoryStreamingThresholdBytes) { orkRequest.RequiresHistoryStreaming = true; // Store past events to serve via StreamInstanceHistory @@ -901,8 +902,7 @@ async Task SendWorkItemToClientAsync(P.WorkItem workItem) lock (this.isConnectedSignal) { outputStream = this.workerToClientStream ?? - // CA2201: Use specific exception types - throw new InvalidOperationException("No client is connected. Need to wait until a client connects before executing."); + throw new RpcException(new Status(StatusCode.Unavailable, "No client is connected.")); } // The gRPC channel can only handle one message at a time, so we need to serialize access to it. @@ -911,6 +911,22 @@ async Task SendWorkItemToClientAsync(P.WorkItem workItem) { await outputStream.WriteAsync(workItem); } + catch (InvalidOperationException ex) when (ex.Message.Contains("request is complete", StringComparison.OrdinalIgnoreCase)) + { + // The client disconnected or canceled the GetWorkItems stream. + // Reset the connection state so the dispatcher pauses naturally + // (via the traffic signal) until a new client connects. + lock (this.isConnectedSignal) + { + this.workerToClientStream = null; + this.isConnectedSignal.Reset(); + } + + // Must throw so callers (ExecuteOrchestrator/ExecuteActivity) can clean up + // their pending TCS. The dispatcher catches this, abandons the work item, + // and releases it back to the queue for retry. + throw new OperationCanceledException("Work-item stream closed by client.", ex); + } finally { this.sendWorkItemLock.Release(); @@ -919,7 +935,7 @@ async Task SendWorkItemToClientAsync(P.WorkItem workItem) TaskCompletionSource CreateTaskCompletionSourceForOrchestrator(string instanceId) { - TaskCompletionSource tcs = new(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); this.pendingOrchestratorTasks.TryAdd(instanceId, tcs); return tcs; } @@ -933,7 +949,7 @@ void RemoveOrchestratorTaskCompletionSource(string instanceId) TaskCompletionSource CreateTaskCompletionSourceForActivity(string instanceId, int taskId) { string taskIdKey = GetTaskIdKey(instanceId, taskId); - TaskCompletionSource tcs = new(); + TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); this.pendingActivityTasks.TryAdd(taskIdKey, tcs); return tcs; } diff --git a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs index 17d69c5e3..81e37311d 100644 --- a/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs +++ b/src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs @@ -742,7 +742,7 @@ public Task WaitForInstanceAsync(string instanceId, Cancella // First, add the waiter before checking completion to avoid a race condition. // This ensures we don't miss a completion notification that happens between // checking the status and adding the waiter. - var tcs = this.waiters.GetOrAdd(instanceId, _ => new TaskCompletionSource()); + var tcs = this.waiters.GetOrAdd(instanceId, _ => new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)); // Now check if already completed - if so, complete the waiter immediately if (this.store.TryGetValue(instanceId, out SerializedInstanceState? state)) diff --git a/test/InProcessTestHost.Tests/ContinueAsNewTests.cs b/test/InProcessTestHost.Tests/ContinueAsNewTests.cs new file mode 100644 index 000000000..c5805643a --- /dev/null +++ b/test/InProcessTestHost.Tests/ContinueAsNewTests.cs @@ -0,0 +1,256 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Testing; +using Microsoft.DurableTask.Worker; +using Xunit; +using Xunit.Abstractions; + +namespace InProcessTestHost.Tests; + +/// +/// Tests that orchestrations using ContinueAsNew with activity calls and timers +/// resume correctly after each iteration and eventually complete. +/// +public class ContinueAsNewTests +{ + readonly ITestOutputHelper output; + + public ContinueAsNewTests(ITestOutputHelper output) + { + this.output = output; + } + + /// + /// Registers a polling-style orchestrator and a stateful activity. + /// + /// TestOrchestrator: + /// 1. Calls TestActivity with current state + /// 2. Updates state from activity result + /// 3. If not closed: waits 1s timer, then ContinueAsNew with updated state + /// 4. If closed: orchestration ends + /// + /// TestActivity: + /// - First call: sets status to InProgress + /// - Second call: sets status to Succeeded (triggers orchestration completion) + /// + static void RegisterTestFunctions(DurableTaskRegistry tasks) + { + tasks.AddOrchestratorFunc("TestOrchestrator", async (context, input) => + { + var result = await context.CallActivityAsync("TestActivity", input); + input.Update(result); + + if (!input.Closed) + { + await context.CreateTimer(TimeSpan.FromSeconds(1), CancellationToken.None); + context.ContinueAsNew(input); + return; + } + }); + + tasks.AddActivityFunc("TestActivity", (context, input) => + { + if (input.Status == Status.InProgress) + { + input.Status = Status.Succeeded; + } + else + { + input.Status = Status.InProgress; + } + + return input; + }); + } + + /// + /// Verifies that a single orchestration calling an activity, waiting on a timer, + /// and then using ContinueAsNew completes after 2 iterations without hanging. + /// Covers the basic ContinueAsNew lifecycle: activity call -> state update -> + /// timer -> ContinueAsNew -> activity call -> completion. + /// + [Fact(Timeout = 30_000)] + public async Task Orchestration_ContinueAsNew_WithActivityAndTimer_CompletesSuccessfully() + { + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions); + + var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 }; + string instanceId = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input); + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(20)); + OrchestrationMetadata metadata = await host.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, cts.Token); + + Assert.NotNull(metadata); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + } + + /// + /// Runs 10 ContinueAsNew orchestrations in parallel, repeated across 3 rounds + /// with a fresh host each round. Validates that activity completion messages + /// are correctly delivered under contention when many orchestrations compete + /// for the dispatcher, ready-to-run queue, and instance locks simultaneously. + /// Total: 30 orchestration instances across 3 rounds. + /// + [Fact(Timeout = 60_000)] + public async Task ConcurrentOrchestrations_ContinueAsNew_AllComplete() + { + const int orchestrationCount = 10; + const int rounds = 3; + + for (int round = 0; round < rounds; round++) + { + this.output.WriteLine($"=== Round {round + 1}/{rounds} ==="); + + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions); + + string[] instanceIds = new string[orchestrationCount]; + for (int i = 0; i < orchestrationCount; i++) + { + var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 }; + instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input); + } + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30)); + + Task[] waitTasks = instanceIds + .Select(id => host.Client.WaitForInstanceCompletionAsync( + id, getInputsAndOutputs: true, cts.Token)) + .ToArray(); + + OrchestrationMetadata[] results = await Task.WhenAll(waitTasks); + + for (int i = 0; i < orchestrationCount; i++) + { + Assert.NotNull(results[i]); + Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus); + } + + this.output.WriteLine($"Round {round + 1}: all {orchestrationCount} completed"); + } + } + + /// + /// Schedules 20 ContinueAsNew orchestrations on a single host as fast as possible. + /// All 20 share the same dispatcher and ready-to-run queue for their entire lifecycle, + /// maximizing interleaving of activity completion messages and ContinueAsNew + /// re-schedules across instances. + /// + [Fact(Timeout = 60_000)] + public async Task RapidFire_SequentialScheduling_ContinueAsNew_AllComplete() + { + const int orchestrationCount = 20; + + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions); + + string[] instanceIds = new string[orchestrationCount]; + for (int i = 0; i < orchestrationCount; i++) + { + var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 }; + instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input); + } + + this.output.WriteLine($"Scheduled {orchestrationCount} orchestrations on a single host"); + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30)); + + Task[] waitTasks = instanceIds + .Select(id => host.Client.WaitForInstanceCompletionAsync( + id, getInputsAndOutputs: true, cts.Token)) + .ToArray(); + + OrchestrationMetadata[] results = await Task.WhenAll(waitTasks); + + for (int i = 0; i < orchestrationCount; i++) + { + Assert.NotNull(results[i]); + Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus); + } + + this.output.WriteLine($"All {orchestrationCount} completed"); + } + + /// + /// Schedules 30 ContinueAsNew orchestrations in 3 waves of 10, with 100ms delays + /// between waves. The staggered scheduling creates overlapping lifecycle phases — + /// earlier orchestrations may be mid-ContinueAsNew (waiting on timer or activity) + /// when later waves arrive, producing varied timing patterns that exercise the + /// interplay between lock release, message delivery, and queue re-scheduling. + /// + [Fact(Timeout = 60_000)] + public async Task Staggered_ContinueAsNew_AllComplete() + { + const int wavesCount = 3; + const int perWave = 10; + + await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions); + + List allInstanceIds = new(); + + for (int wave = 0; wave < wavesCount; wave++) + { + for (int i = 0; i < perWave; i++) + { + var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 }; + string id = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input); + allInstanceIds.Add(id); + } + + this.output.WriteLine($"Wave {wave + 1}/{wavesCount}: scheduled {perWave} orchestrations"); + await Task.Delay(100); + } + + this.output.WriteLine($"Total scheduled: {allInstanceIds.Count}"); + + using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30)); + + Task[] waitTasks = allInstanceIds + .Select(id => host.Client.WaitForInstanceCompletionAsync( + id, getInputsAndOutputs: true, cts.Token)) + .ToArray(); + + OrchestrationMetadata[] results = await Task.WhenAll(waitTasks); + + for (int i = 0; i < results.Length; i++) + { + Assert.NotNull(results[i]); + Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus); + } + + this.output.WriteLine($"All {allInstanceIds.Count} completed"); + } + + #region Models + + public enum Status + { + NotStarted, + InProgress, + Succeeded, + } + + public class AsyncOperation + { + public Status Status { get; set; } + + public bool Closed { get; set; } + + public int IterationCount { get; set; } + + public void Update(AsyncOperation result) + { + this.Status = result.Status; + this.IterationCount++; + + if (this.Status == Status.Succeeded) + { + this.Closed = true; + } + } + } + + #endregion +}