Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ public override void BatchEvaluated(int openBatches)
}

/// <inheritdoc />
public override void BatchDispatched(int dispatchedBatches, bool inParallel)
public override void BatchDispatched(int dispatchedBatches)
{
for (var i = 0; i < listeners.Length; i++)
{
listeners[i].BatchDispatched(dispatchedBatches, inParallel);
listeners[i].BatchDispatched(dispatchedBatches);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public virtual void BatchEvaluated(int openBatches)
{ }

/// <inheritdoc />
public virtual void BatchDispatched(int dispatchedBatches, bool inParallel)
public virtual void BatchDispatched(int dispatchedBatches)
{ }

private sealed class EmptyActivityScope : IDisposable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,5 @@ void BatchItemError<TKey>(
/// <param name="dispatchedBatches">
/// The number of batches that have been dispatched.
/// </param>
/// <param name="inParallel">
/// Indicates whether the batches have been dispatched in parallel.
/// </param>
void BatchDispatched(int dispatchedBatches, bool inParallel);
void BatchDispatched(int dispatchedBatches);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
.AddDefaultBatchDispatcher(
new HotChocolate.Fetching.BatchDispatcherOptions
{
EnableParallelBatches = false,
MaxParallelBatches = 4,
MaxBatchWaitTimeUs = 50_000
})
.AddDiagnosticEventListener<BenchmarkDataLoaderDiagnosticEventListener>()
Expand Down
127 changes: 84 additions & 43 deletions src/HotChocolate/Core/src/Types/Fetching/BatchDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public sealed partial class BatchDispatcher : IBatchDispatcher
private readonly IDataLoaderDiagnosticEvents _diagnosticEvents;
private readonly BatchDispatcherOptions _options;
private List<Task>? _dispatchTasks;
private List<Task>? _inFlightDispatches;
private int _openBatches;
private long _lastSubscribed;
private long _lastEnqueued;
Expand All @@ -43,6 +44,15 @@ public BatchDispatcher(IDataLoaderDiagnosticEvents diagnosticEvents, BatchDispat
ArgumentNullException.ThrowIfNull(diagnosticEvents);

_diagnosticEvents = diagnosticEvents;

// Guard against `default(BatchDispatcherOptions)` which zeroes all fields,
// bypassing the struct's field initializers and silently disabling age-based
// forced dispatch.
if (options.MaxBatchWaitTimeUs == 0)
{
options.MaxBatchWaitTimeUs = 50_000;
}

_options = options;
_coordinatorCts.Token.Register(_signal.Set);
}
Expand Down Expand Up @@ -91,7 +101,8 @@ private async Task CoordinatorAsync(CancellationToken stoppingToken)
using var scope = _diagnosticEvents.RunBatchDispatchCoordinator();

var backlog = new PriorityQueue<Batch, long>();
_dispatchTasks ??= new List<Task>(_options.MaxParallelBatches);
_dispatchTasks ??= new List<Task>(4);
_inFlightDispatches ??= new List<Task>(4);

Send(BatchDispatchEventType.CoordinatorStarted);

Expand All @@ -106,7 +117,11 @@ private async Task CoordinatorAsync(CancellationToken stoppingToken)
return;
}

await EvaluateAndDispatchAsync(backlog, _dispatchTasks, stoppingToken);
await EvaluateAndDispatchAsync(
backlog,
_dispatchTasks,
_inFlightDispatches,
stoppingToken);
}
}
catch (Exception ex)
Expand All @@ -124,17 +139,30 @@ private async Task CoordinatorAsync(CancellationToken stoppingToken)
private async Task EvaluateAndDispatchAsync(
PriorityQueue<Batch, long> backlog,
List<Task> dispatchTasks,
List<Task> inFlightDispatches,
CancellationToken stoppingToken)
{
var noDispatchCycles = 0;
var idleCycles = 0;

while (!stoppingToken.IsCancellationRequested)
{
var completedDispatches = await CompleteInFlightDispatchesAsync(inFlightDispatches)
.ConfigureAwait(false);

if (completedDispatches > 0)
{
_diagnosticEvents.BatchDispatched(completedDispatches);
Send(BatchDispatchEventType.Dispatched);
idleCycles = 0;
}

var openBatches = Volatile.Read(ref _openBatches);
long lastModified = 0;

// If we have no open batches to evaluate, we can stop
// and wait for another signal.
// If we have no open batches to evaluate and all in-flight dispatches
// are completed, we can stop and wait for another signal.
// If there are in-flight dispatches still running we also stop and
// wait for their completion signal.
if (openBatches == 0)
{
return;
Expand All @@ -146,35 +174,13 @@ private async Task EvaluateAndDispatchAsync(

EvaluateOpenBatches(ref lastModified, backlog, dispatchTasks);

// If the evaluation selected batches for dispatch.
// If the evaluation selected batches for dispatch, we register them
// as in-flight and continue evaluation without waiting for completion.
if (dispatchTasks.Count > 0)
{
// We wait for all dispatch tasks to be completed before we reset the signal
// that lets us pause the evaluation. Only then will we send a message to
// the subscribed executors to reevaluate if they can continue execution.
if (dispatchTasks.Count == 1)
{
await dispatchTasks[0];
}
else
{
if (_options.EnableParallelBatches)
{
await Task.WhenAll(dispatchTasks);
}
else
{
foreach (var task in dispatchTasks)
{
await task.ConfigureAwait(false);
}
}
}

_diagnosticEvents.BatchDispatched(dispatchTasks.Count, _options.EnableParallelBatches);
_signal.TryResetToIdle();
Send(BatchDispatchEventType.Dispatched);
return;
RegisterInFlightDispatches(dispatchTasks, inFlightDispatches);
idleCycles = 0;
continue;
}

// Signal that we have evaluated all enqueued tasks without dispatching any.
Expand All @@ -185,17 +191,58 @@ private async Task EvaluateAndDispatchAsync(
// data requirements to the open batches.
await WaitForMoreBatchActivityAsync(lastModified);

// After 10 cycles without dispatch, we add a small delay to provide backpressure.
if (noDispatchCycles >= 10)
// After 10 cycles without dispatch, insert a delay to avoid busy-spinning.
// The first 10 cycles run tight (only the conditional yield in
// WaitForMoreBatchActivityAsync) to give resolvers time to fill batches.
if (idleCycles++ >= 10)
{
await Task.Delay(10, stoppingToken);
noDispatchCycles = 0;
idleCycles = 0;
}
}
}

private void RegisterInFlightDispatches(
List<Task> dispatchTasks,
List<Task> inFlightDispatches)
{
foreach (var dispatchTask in dispatchTasks)
{
if (!dispatchTask.IsCompleted)
{
_ = dispatchTask.ContinueWith(
static (_, state) => ((AsyncAutoResetEvent)state!).Set(),
_signal,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}

noDispatchCycles++;
inFlightDispatches.Add(dispatchTask);
}
}

private static async Task<int> CompleteInFlightDispatchesAsync(List<Task> inFlightDispatches)
{
var completedDispatches = 0;

for (var i = inFlightDispatches.Count - 1; i >= 0; i--)
{
var dispatchTask = inFlightDispatches[i];

if (!dispatchTask.IsCompleted)
{
continue;
}

await dispatchTask.ConfigureAwait(false);
inFlightDispatches.RemoveAt(i);
completedDispatches++;
}

return completedDispatches;
}

private void EvaluateOpenBatches(
ref long lastModified,
PriorityQueue<Batch, long> backlog,
Expand Down Expand Up @@ -231,7 +278,7 @@ private void EvaluateOpenBatches(
// we force dispatch it regardless of its status to prevent starvation
// under continuous high load.
//
// We stop evaluation once we've dispatched MaxParallelBatches or when we have touched all batches.
// We stop evaluation when we have touched all batches.
if (singleBatch is not null)
{
// we have an optimized path if there is only a single batch to evaluate.
Expand All @@ -250,7 +297,6 @@ private void EvaluateMultipleOpenBatches(
{
var now = Stopwatch.GetTimestamp();
var maxBatchAgeUs = _options.MaxBatchWaitTimeUs;
var maxParallelBatches = _options.MaxParallelBatches;

while (backlog.TryDequeue(out var batch, out _))
{
Expand All @@ -276,11 +322,6 @@ private void EvaluateMultipleOpenBatches(
Interlocked.Decrement(ref _openBatches);
dispatchTasks.Add(batch.DispatchAsync());
}

if (dispatchTasks.Count == maxParallelBatches)
{
break;
}
}
}

Expand Down
19 changes: 0 additions & 19 deletions src/HotChocolate/Core/src/Types/Fetching/BatchDispatcherOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace HotChocolate.Fetching;
/// </summary>
public struct BatchDispatcherOptions
{
private int _maxParallelBatches = 4;
private long _maxBatchWaitTimeUs = 50_000;

/// <summary>
Expand All @@ -15,24 +14,6 @@ public BatchDispatcherOptions()
{
}

/// <summary>
/// Gets or sets a value indicating whether batches can be dispatched in parallel.
/// </summary>
public bool EnableParallelBatches { get; set; } = false;

/// <summary>
/// Gets or sets the maximum number of batches that can be dispatched in parallel.
/// </summary>
public int MaxParallelBatches
{
readonly get => _maxParallelBatches;
set
{
ArgumentOutOfRangeException.ThrowIfLessThan(value, 2, nameof(MaxParallelBatches));
_maxParallelBatches = value;
}
}

/// <summary>
/// Gets or sets the maximum wait time in microseconds before a batch is forcefully dispatched.
/// Disable max batch wait time by setting this value to 0.
Expand Down
Loading
Loading