diff --git a/src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Execution/FusionRequestExecutor.cs b/src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Execution/FusionRequestExecutor.cs index a1c2dd9fcdf..c32c9b6943a 100644 --- a/src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Execution/FusionRequestExecutor.cs +++ b/src/HotChocolate/Fusion-vnext/src/Fusion.Execution/Execution/FusionRequestExecutor.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using System.Runtime.CompilerServices; using HotChocolate.Execution; using HotChocolate.Features; @@ -216,7 +217,7 @@ private async IAsyncEnumerable ExecuteBatchStream( var requestCount = requests.Count; var tasks = Interlocked.Exchange(ref _taskList, null) ?? new List(requestCount); - var completed = new List(); + var completed = new ConcurrentQueue(); for (var i = 0; i < requestCount; i++) { @@ -225,7 +226,7 @@ private async IAsyncEnumerable ExecuteBatchStream( var buffer = new OperationResult[Math.Min(16, requestCount)]; - while (tasks.Count > 0 || completed.Count > 0) + while (tasks.Count > 0 || !completed.IsEmpty) { var count = completed.TryDequeueRange(buffer); @@ -234,7 +235,7 @@ private async IAsyncEnumerable ExecuteBatchStream( yield return buffer[i]; } - if (completed.Count == 0 && tasks.Count > 0) + if (completed.IsEmpty && tasks.Count > 0) { await Task.WhenAny(tasks).ConfigureAwait(false); @@ -280,7 +281,7 @@ private static IOperationRequest WithServices( private async Task ExecuteBatchItemAsync( IOperationRequest request, int requestIndex, - List completed, + ConcurrentQueue completed, CancellationToken cancellationToken) { var result = await ExecuteAsync(request, requestIndex, cancellationToken).ConfigureAwait(false); @@ -289,7 +290,7 @@ private async Task ExecuteBatchItemAsync( private static async Task UnwrapBatchItemResultAsync( IExecutionResult result, - List completed, + ConcurrentQueue completed, CancellationToken cancellationToken) { switch (result) @@ -340,30 +341,17 @@ private static async Task UnwrapBatchItemResultAsync( public ValueTask DisposeAsync() => Schema.DisposeAsync(); } -file static class ListExtensions +file static class ConcurrentQueueExtensions { - public static void Enqueue(this List queue, T item) + public static int TryDequeueRange(this ConcurrentQueue queue, T[] buffer) { - lock (queue) - { - queue.Insert(0, item); - } - } + var i = 0; - public static int TryDequeueRange(this List queue, T[] buffer) - { - lock (queue) + while (i < buffer.Length && queue.TryDequeue(out var value)) { - var count = Math.Min(queue.Count, buffer.Length); - var j = 0; - - for (var i = count - 1; i >= 0; i--) - { - buffer[j++] = queue[i]; - queue.RemoveAt(i); - } - - return count; + buffer[i++] = value; } + + return i; } }