diff --git a/src/GreenDonut/src/GreenDonut/Instrumentation/AggregateDataLoaderDiagnosticEventListener.cs b/src/GreenDonut/src/GreenDonut/Instrumentation/AggregateDataLoaderDiagnosticEventListener.cs index db92769a644..08debbe1dbc 100644 --- a/src/GreenDonut/src/GreenDonut/Instrumentation/AggregateDataLoaderDiagnosticEventListener.cs +++ b/src/GreenDonut/src/GreenDonut/Instrumentation/AggregateDataLoaderDiagnosticEventListener.cs @@ -92,11 +92,11 @@ public override void BatchEvaluated(int openBatches) } /// - public override void BatchDispatched(int dispatchedBatches, bool inParallel) + public override void BatchDispatched(int dispatchedBatches) { for (var i = 0; i < listeners.Length; i++) { - listeners[i].BatchDispatched(dispatchedBatches, inParallel); + listeners[i].BatchDispatched(dispatchedBatches); } } diff --git a/src/GreenDonut/src/GreenDonut/Instrumentation/DataLoaderDiagnosticEventListener.cs b/src/GreenDonut/src/GreenDonut/Instrumentation/DataLoaderDiagnosticEventListener.cs index 65b7b7e2c79..3bf205070be 100644 --- a/src/GreenDonut/src/GreenDonut/Instrumentation/DataLoaderDiagnosticEventListener.cs +++ b/src/GreenDonut/src/GreenDonut/Instrumentation/DataLoaderDiagnosticEventListener.cs @@ -59,7 +59,7 @@ public virtual void BatchEvaluated(int openBatches) { } /// - public virtual void BatchDispatched(int dispatchedBatches, bool inParallel) + public virtual void BatchDispatched(int dispatchedBatches) { } private sealed class EmptyActivityScope : IDisposable diff --git a/src/GreenDonut/src/GreenDonut/Instrumentation/IDataLoaderDiagnosticEvents.cs b/src/GreenDonut/src/GreenDonut/Instrumentation/IDataLoaderDiagnosticEvents.cs index 91c83fb66c9..c6b36b0077e 100644 --- a/src/GreenDonut/src/GreenDonut/Instrumentation/IDataLoaderDiagnosticEvents.cs +++ b/src/GreenDonut/src/GreenDonut/Instrumentation/IDataLoaderDiagnosticEvents.cs @@ -95,8 +95,5 @@ void BatchItemError( /// /// The number of batches that have been dispatched. /// - /// - /// Indicates whether the batches have been dispatched in parallel. - /// - void BatchDispatched(int dispatchedBatches, bool inParallel); + void BatchDispatched(int dispatchedBatches); } diff --git a/src/HotChocolate/AspNetCore/benchmarks/k6/Catalog.API/Program.cs b/src/HotChocolate/AspNetCore/benchmarks/k6/Catalog.API/Program.cs index 9b529304ea5..c3c269c588d 100644 --- a/src/HotChocolate/AspNetCore/benchmarks/k6/Catalog.API/Program.cs +++ b/src/HotChocolate/AspNetCore/benchmarks/k6/Catalog.API/Program.cs @@ -35,8 +35,6 @@ .AddDefaultBatchDispatcher( new HotChocolate.Fetching.BatchDispatcherOptions { - EnableParallelBatches = false, - MaxParallelBatches = 4, MaxBatchWaitTimeUs = 50_000 }) .AddDiagnosticEventListener() diff --git a/src/HotChocolate/Core/src/Types/Fetching/BatchDispatcher.cs b/src/HotChocolate/Core/src/Types/Fetching/BatchDispatcher.cs index e159154bd2d..4855faef755 100644 --- a/src/HotChocolate/Core/src/Types/Fetching/BatchDispatcher.cs +++ b/src/HotChocolate/Core/src/Types/Fetching/BatchDispatcher.cs @@ -28,6 +28,7 @@ public sealed partial class BatchDispatcher : IBatchDispatcher private readonly IDataLoaderDiagnosticEvents _diagnosticEvents; private readonly BatchDispatcherOptions _options; private List? _dispatchTasks; + private List? _inFlightDispatches; private int _openBatches; private long _lastSubscribed; private long _lastEnqueued; @@ -43,6 +44,15 @@ public BatchDispatcher(IDataLoaderDiagnosticEvents diagnosticEvents, BatchDispat ArgumentNullException.ThrowIfNull(diagnosticEvents); _diagnosticEvents = diagnosticEvents; + + // Guard against `default(BatchDispatcherOptions)` which zeroes all fields, + // bypassing the struct's field initializers and silently disabling age-based + // forced dispatch. + if (options.MaxBatchWaitTimeUs == 0) + { + options.MaxBatchWaitTimeUs = 50_000; + } + _options = options; _coordinatorCts.Token.Register(_signal.Set); } @@ -91,7 +101,8 @@ private async Task CoordinatorAsync(CancellationToken stoppingToken) using var scope = _diagnosticEvents.RunBatchDispatchCoordinator(); var backlog = new PriorityQueue(); - _dispatchTasks ??= new List(_options.MaxParallelBatches); + _dispatchTasks ??= new List(4); + _inFlightDispatches ??= new List(4); Send(BatchDispatchEventType.CoordinatorStarted); @@ -106,7 +117,11 @@ private async Task CoordinatorAsync(CancellationToken stoppingToken) return; } - await EvaluateAndDispatchAsync(backlog, _dispatchTasks, stoppingToken); + await EvaluateAndDispatchAsync( + backlog, + _dispatchTasks, + _inFlightDispatches, + stoppingToken); } } catch (Exception ex) @@ -124,17 +139,30 @@ private async Task CoordinatorAsync(CancellationToken stoppingToken) private async Task EvaluateAndDispatchAsync( PriorityQueue backlog, List dispatchTasks, + List inFlightDispatches, CancellationToken stoppingToken) { - var noDispatchCycles = 0; + var idleCycles = 0; while (!stoppingToken.IsCancellationRequested) { + var completedDispatches = await CompleteInFlightDispatchesAsync(inFlightDispatches) + .ConfigureAwait(false); + + if (completedDispatches > 0) + { + _diagnosticEvents.BatchDispatched(completedDispatches); + Send(BatchDispatchEventType.Dispatched); + idleCycles = 0; + } + var openBatches = Volatile.Read(ref _openBatches); long lastModified = 0; - // If we have no open batches to evaluate, we can stop - // and wait for another signal. + // If we have no open batches to evaluate and all in-flight dispatches + // are completed, we can stop and wait for another signal. + // If there are in-flight dispatches still running we also stop and + // wait for their completion signal. if (openBatches == 0) { return; @@ -146,35 +174,13 @@ private async Task EvaluateAndDispatchAsync( EvaluateOpenBatches(ref lastModified, backlog, dispatchTasks); - // If the evaluation selected batches for dispatch. + // If the evaluation selected batches for dispatch, we register them + // as in-flight and continue evaluation without waiting for completion. if (dispatchTasks.Count > 0) { - // We wait for all dispatch tasks to be completed before we reset the signal - // that lets us pause the evaluation. Only then will we send a message to - // the subscribed executors to reevaluate if they can continue execution. - if (dispatchTasks.Count == 1) - { - await dispatchTasks[0]; - } - else - { - if (_options.EnableParallelBatches) - { - await Task.WhenAll(dispatchTasks); - } - else - { - foreach (var task in dispatchTasks) - { - await task.ConfigureAwait(false); - } - } - } - - _diagnosticEvents.BatchDispatched(dispatchTasks.Count, _options.EnableParallelBatches); - _signal.TryResetToIdle(); - Send(BatchDispatchEventType.Dispatched); - return; + RegisterInFlightDispatches(dispatchTasks, inFlightDispatches); + idleCycles = 0; + continue; } // Signal that we have evaluated all enqueued tasks without dispatching any. @@ -185,17 +191,58 @@ private async Task EvaluateAndDispatchAsync( // data requirements to the open batches. await WaitForMoreBatchActivityAsync(lastModified); - // After 10 cycles without dispatch, we add a small delay to provide backpressure. - if (noDispatchCycles >= 10) + // After 10 cycles without dispatch, insert a delay to avoid busy-spinning. + // The first 10 cycles run tight (only the conditional yield in + // WaitForMoreBatchActivityAsync) to give resolvers time to fill batches. + if (idleCycles++ >= 10) { await Task.Delay(10, stoppingToken); - noDispatchCycles = 0; + idleCycles = 0; + } + } + } + + private void RegisterInFlightDispatches( + List dispatchTasks, + List inFlightDispatches) + { + foreach (var dispatchTask in dispatchTasks) + { + if (!dispatchTask.IsCompleted) + { + _ = dispatchTask.ContinueWith( + static (_, state) => ((AsyncAutoResetEvent)state!).Set(), + _signal, + CancellationToken.None, + TaskContinuationOptions.ExecuteSynchronously, + TaskScheduler.Default); } - noDispatchCycles++; + inFlightDispatches.Add(dispatchTask); } } + private static async Task CompleteInFlightDispatchesAsync(List inFlightDispatches) + { + var completedDispatches = 0; + + for (var i = inFlightDispatches.Count - 1; i >= 0; i--) + { + var dispatchTask = inFlightDispatches[i]; + + if (!dispatchTask.IsCompleted) + { + continue; + } + + await dispatchTask.ConfigureAwait(false); + inFlightDispatches.RemoveAt(i); + completedDispatches++; + } + + return completedDispatches; + } + private void EvaluateOpenBatches( ref long lastModified, PriorityQueue backlog, @@ -231,7 +278,7 @@ private void EvaluateOpenBatches( // we force dispatch it regardless of its status to prevent starvation // under continuous high load. // - // We stop evaluation once we've dispatched MaxParallelBatches or when we have touched all batches. + // We stop evaluation when we have touched all batches. if (singleBatch is not null) { // we have an optimized path if there is only a single batch to evaluate. @@ -250,7 +297,6 @@ private void EvaluateMultipleOpenBatches( { var now = Stopwatch.GetTimestamp(); var maxBatchAgeUs = _options.MaxBatchWaitTimeUs; - var maxParallelBatches = _options.MaxParallelBatches; while (backlog.TryDequeue(out var batch, out _)) { @@ -276,11 +322,6 @@ private void EvaluateMultipleOpenBatches( Interlocked.Decrement(ref _openBatches); dispatchTasks.Add(batch.DispatchAsync()); } - - if (dispatchTasks.Count == maxParallelBatches) - { - break; - } } } diff --git a/src/HotChocolate/Core/src/Types/Fetching/BatchDispatcherOptions.cs b/src/HotChocolate/Core/src/Types/Fetching/BatchDispatcherOptions.cs index 8090824cf62..1e5d38bfaa6 100644 --- a/src/HotChocolate/Core/src/Types/Fetching/BatchDispatcherOptions.cs +++ b/src/HotChocolate/Core/src/Types/Fetching/BatchDispatcherOptions.cs @@ -5,7 +5,6 @@ namespace HotChocolate.Fetching; /// public struct BatchDispatcherOptions { - private int _maxParallelBatches = 4; private long _maxBatchWaitTimeUs = 50_000; /// @@ -15,24 +14,6 @@ public BatchDispatcherOptions() { } - /// - /// Gets or sets a value indicating whether batches can be dispatched in parallel. - /// - public bool EnableParallelBatches { get; set; } = false; - - /// - /// Gets or sets the maximum number of batches that can be dispatched in parallel. - /// - public int MaxParallelBatches - { - readonly get => _maxParallelBatches; - set - { - ArgumentOutOfRangeException.ThrowIfLessThan(value, 2, nameof(MaxParallelBatches)); - _maxParallelBatches = value; - } - } - /// /// Gets or sets the maximum wait time in microseconds before a batch is forcefully dispatched. /// Disable max batch wait time by setting this value to 0. diff --git a/src/HotChocolate/Core/test/Fetching.Tests/BatchDispatcherTests.cs b/src/HotChocolate/Core/test/Fetching.Tests/BatchDispatcherTests.cs index 452c4f4066f..dd8474e08ef 100644 --- a/src/HotChocolate/Core/test/Fetching.Tests/BatchDispatcherTests.cs +++ b/src/HotChocolate/Core/test/Fetching.Tests/BatchDispatcherTests.cs @@ -85,6 +85,97 @@ public void Schedule_OneAction_ShouldRaiseTaskEnqueued() Assert.Equal(BatchDispatchEventType.Enqueued, observer.Events[0]); } + [Fact] + public async Task BeginDispatch_Allows_Nested_Batch_To_Dispatch_While_Parent_Is_InFlight() + { + // arrange + var observer = new TestObserver(); + var scheduler = new BatchDispatcher(new DataLoaderDiagnosticEventListener()); + using var session = scheduler.Subscribe(observer); + + var innerDispatched = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var outerCompleted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var innerBatch = new DelegatingBatch( + () => + { + innerDispatched.TrySetResult(); + return Task.CompletedTask; + }); + + var outerBatch = new DelegatingBatch( + async () => + { + scheduler.Schedule(innerBatch); + scheduler.BeginDispatch(); + await innerDispatched.Task; + outerCompleted.TrySetResult(); + }); + + scheduler.Schedule(outerBatch); + + // act + scheduler.BeginDispatch(); + var completedTask = await Task.WhenAny(outerCompleted.Task, Task.Delay(3_000)); + + // assert + scheduler.Dispose(); + Assert.Same(outerCompleted.Task, completedTask); + await outerCompleted.Task; + } + + [Fact] + public async Task BeginDispatch_Many_Concurrent_Nested_Batches_Do_Not_Deadlock() + { + // arrange — N outer batches each schedule and await a nested inner batch. + // Previously this would deadlock when N >= MaxParallelBatches because + // the capacity limit prevented nested batches from being dispatched + // while all slots were occupied by their parents. + const int batchCount = 8; + var scheduler = new BatchDispatcher(new DataLoaderDiagnosticEventListener()); + var observer = new TestObserver(); + using var session = scheduler.Subscribe(observer); + + var allCompleted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var remaining = batchCount; + + for (var i = 0; i < batchCount; i++) + { + var innerDispatched = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var innerBatch = new DelegatingBatch( + () => + { + innerDispatched.TrySetResult(); + return Task.CompletedTask; + }); + + var outerBatch = new DelegatingBatch( + async () => + { + scheduler.Schedule(innerBatch); + scheduler.BeginDispatch(); + await innerDispatched.Task; + + if (Interlocked.Decrement(ref remaining) == 0) + { + allCompleted.TrySetResult(); + } + }); + + scheduler.Schedule(outerBatch); + } + + // act + scheduler.BeginDispatch(); + var completedTask = await Task.WhenAny(allCompleted.Task, Task.Delay(5_000)); + + // assert + scheduler.Dispose(); + Assert.Same(allCompleted.Task, completedTask); + await allCompleted.Task; + } + public class TestObserver : IObserver { public ImmutableList Events { get; private set; } = []; @@ -129,4 +220,36 @@ public override bool Touch() public override Task DispatchAsync() => Task.CompletedTask; } + + public class DelegatingBatch : Batch + { + private readonly Func _dispatch; + private BatchStatus _status = BatchStatus.Enqueued; + + public DelegatingBatch(Func dispatch) + { + _dispatch = dispatch; + } + + public override int Size => 1; + + public override BatchStatus Status => _status; + + public override long ModifiedTimestamp { get; } = Stopwatch.GetTimestamp(); + + public override long CreatedTimestamp { get; } = Stopwatch.GetTimestamp(); + + public override bool Touch() + { + if (_status is BatchStatus.Touched) + { + return true; + } + + _status = BatchStatus.Touched; + return false; + } + + public override Task DispatchAsync() => _dispatch(); + } } diff --git a/src/HotChocolate/Diagnostics/src/Diagnostics/Listeners/ActivityDataLoaderDiagnosticListener.cs b/src/HotChocolate/Diagnostics/src/Diagnostics/Listeners/ActivityDataLoaderDiagnosticListener.cs index f74ce5ecf91..a0eeffcedd6 100644 --- a/src/HotChocolate/Diagnostics/src/Diagnostics/Listeners/ActivityDataLoaderDiagnosticListener.cs +++ b/src/HotChocolate/Diagnostics/src/Diagnostics/Listeners/ActivityDataLoaderDiagnosticListener.cs @@ -63,14 +63,13 @@ public override void BatchEvaluated(int openBatches) })); } - public override void BatchDispatched(int dispatchedBatches, bool inParallel) + public override void BatchDispatched(int dispatchedBatches) { Activity.Current?.AddEvent(new ActivityEvent( "BatchDispatched", tags: new ActivityTagsCollection { - { "dispatchedBatches", dispatchedBatches }, - { "inParallel", inParallel } + { "dispatchedBatches", dispatchedBatches } })); } } diff --git a/src/HotChocolate/Diagnostics/test/Diagnostics.Tests/__snapshots__/QueryInstrumentationTests.Track_data_loader_events.snap b/src/HotChocolate/Diagnostics/test/Diagnostics.Tests/__snapshots__/QueryInstrumentationTests.Track_data_loader_events.snap index 8fd27a18059..04ebef7195e 100644 --- a/src/HotChocolate/Diagnostics/test/Diagnostics.Tests/__snapshots__/QueryInstrumentationTests.Track_data_loader_events.snap +++ b/src/HotChocolate/Diagnostics/test/Diagnostics.Tests/__snapshots__/QueryInstrumentationTests.Track_data_loader_events.snap @@ -93,10 +93,6 @@ { "Key": "dispatchedBatches", "Value": 1 - }, - { - "Key": "inParallel", - "Value": false } ] } diff --git a/src/HotChocolate/Diagnostics/test/Diagnostics.Tests/__snapshots__/QueryInstrumentationTests.Track_data_loader_events_with_keys.snap b/src/HotChocolate/Diagnostics/test/Diagnostics.Tests/__snapshots__/QueryInstrumentationTests.Track_data_loader_events_with_keys.snap index 8fd27a18059..04ebef7195e 100644 --- a/src/HotChocolate/Diagnostics/test/Diagnostics.Tests/__snapshots__/QueryInstrumentationTests.Track_data_loader_events_with_keys.snap +++ b/src/HotChocolate/Diagnostics/test/Diagnostics.Tests/__snapshots__/QueryInstrumentationTests.Track_data_loader_events_with_keys.snap @@ -93,10 +93,6 @@ { "Key": "dispatchedBatches", "Value": 1 - }, - { - "Key": "inParallel", - "Value": false } ] }