Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using System.Runtime.CompilerServices;
using HotChocolate.Execution;
using HotChocolate.Features;
Expand Down Expand Up @@ -216,7 +217,7 @@ private async IAsyncEnumerable<OperationResult> ExecuteBatchStream(
var requestCount = requests.Count;
var tasks = Interlocked.Exchange(ref _taskList, null) ?? new List<Task>(requestCount);

var completed = new List<OperationResult>();
var completed = new ConcurrentQueue<OperationResult>();

for (var i = 0; i < requestCount; i++)
{
Expand All @@ -225,7 +226,7 @@ private async IAsyncEnumerable<OperationResult> ExecuteBatchStream(

var buffer = new OperationResult[Math.Min(16, requestCount)];

while (tasks.Count > 0 || completed.Count > 0)
while (tasks.Count > 0 || !completed.IsEmpty)
{
var count = completed.TryDequeueRange(buffer);

Expand All @@ -234,7 +235,7 @@ private async IAsyncEnumerable<OperationResult> ExecuteBatchStream(
yield return buffer[i];
}

if (completed.Count == 0 && tasks.Count > 0)
if (completed.IsEmpty && tasks.Count > 0)
{
await Task.WhenAny(tasks).ConfigureAwait(false);

Expand Down Expand Up @@ -280,7 +281,7 @@ private static IOperationRequest WithServices(
private async Task ExecuteBatchItemAsync(
IOperationRequest request,
int requestIndex,
List<OperationResult> completed,
ConcurrentQueue<OperationResult> completed,
CancellationToken cancellationToken)
{
var result = await ExecuteAsync(request, requestIndex, cancellationToken).ConfigureAwait(false);
Expand All @@ -289,7 +290,7 @@ private async Task ExecuteBatchItemAsync(

private static async Task UnwrapBatchItemResultAsync(
IExecutionResult result,
List<OperationResult> completed,
ConcurrentQueue<OperationResult> completed,
CancellationToken cancellationToken)
{
switch (result)
Expand Down Expand Up @@ -340,30 +341,17 @@ private static async Task UnwrapBatchItemResultAsync(
public ValueTask DisposeAsync() => Schema.DisposeAsync();
}

file static class ListExtensions
file static class ConcurrentQueueExtensions
{
public static void Enqueue<T>(this List<T> queue, T item)
public static int TryDequeueRange<T>(this ConcurrentQueue<T> queue, T[] buffer)
{
lock (queue)
{
queue.Insert(0, item);
}
}
var i = 0;

public static int TryDequeueRange<T>(this List<T> queue, T[] buffer)
{
lock (queue)
while (i < buffer.Length && queue.TryDequeue(out var value))
{
var count = Math.Min(queue.Count, buffer.Length);
var j = 0;

for (var i = count - 1; i >= 0; i--)
{
buffer[j++] = queue[i];
queue.RemoveAt(i);
}

return count;
buffer[i++] = value;
}

return i;
}
}
Loading