Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
615289d
Fusion: reduce forwarded variable allocation overhead
michaelstaib Feb 25, 2026
55cdf5f
Fusion: reduce dispatcher batching submission overhead
michaelstaib Feb 25, 2026
68e3f55
Fusion: fast-path single partial result merge
michaelstaib Feb 25, 2026
fe5966b
Fusion: avoid duplicate error lookups in operation result streams
michaelstaib Feb 25, 2026
4ad7bcd
Fusion: skip error merge overhead when source results are clean
michaelstaib Feb 25, 2026
b208de1
Fusion: avoid pooled buffers for single source responses
michaelstaib Feb 26, 2026
eb9eacd
Revert "Fusion: avoid pooled buffers for single source responses"
michaelstaib Feb 26, 2026
224ffa6
Fusion: remove per-group node tracking in dispatcher
michaelstaib Feb 26, 2026
0c3d145
Fusion: clear dispatcher slots from plan node ids
michaelstaib Feb 26, 2026
ea2ed7a
Fusion: avoid temporary path nodes in result mapper
michaelstaib Feb 26, 2026
56b618a
Fusion: fast-path string dedupe in variable sets
michaelstaib Feb 26, 2026
03ffb25
Revert "Fusion: avoid temporary path nodes in result mapper"
michaelstaib Feb 26, 2026
d5ae897
Revert "Fusion: clear dispatcher slots from plan node ids"
michaelstaib Feb 26, 2026
52442f1
Revert "Fusion: remove per-group node tracking in dispatcher"
michaelstaib Feb 26, 2026
1abbd3d
Fusion: fast-path integer leaf value mapping
michaelstaib Feb 26, 2026
2ee203d
Fusion: fast-path float leaf value mapping
michaelstaib Feb 26, 2026
601c854
Revert "Fusion: fast-path float leaf value mapping"
michaelstaib Feb 26, 2026
d2dc9aa
Fusion: avoid path allocations in object field mapping
michaelstaib Feb 26, 2026
0ade67d
Revert "Fusion: avoid path allocations in object field mapping"
michaelstaib Feb 26, 2026
baad6e7
Fusion: cache small int value nodes in result mapping
michaelstaib Feb 26, 2026
2bf197b
Revert "Fusion: cache small int value nodes in result mapping"
michaelstaib Feb 26, 2026
75dde74
Fusion: cache numeric string value nodes
michaelstaib Feb 26, 2026
4f32f75
Fusion: fast-path two-requirement scalar tuple dedupe
michaelstaib Feb 26, 2026
631b5ae
Revert "Fusion: fast-path two-requirement scalar tuple dedupe"
michaelstaib Feb 26, 2026
0163825
Fusion: cache small int leaf value nodes
michaelstaib Feb 26, 2026
9813870
Revert "Fusion: cache small int leaf value nodes"
michaelstaib Feb 26, 2026
4dd6603
Fusion: fast-path value completion when no errors
michaelstaib Feb 26, 2026
76577a1
Revert "Fusion: fast-path value completion when no errors"
michaelstaib Feb 26, 2026
e3da2ee
Fusion: fast-path single-result source response buffering
michaelstaib Feb 26, 2026
fba80e7
Fusion: defer string requirement dedupe map allocation
michaelstaib Feb 26, 2026
4b9f767
Revert "Fusion: defer string requirement dedupe map allocation"
michaelstaib Feb 26, 2026
eba8459
Fusion: lazily allocate non-string requirement dedupe map
michaelstaib Feb 26, 2026
ec5ce07
Revert "Fusion: lazily allocate non-string requirement dedupe map"
michaelstaib Feb 26, 2026
3d38afc
Fusion: lazily compute batch response buffer capacity
michaelstaib Feb 26, 2026
ea50999
Revert "Fusion: lazily compute batch response buffer capacity"
michaelstaib Feb 26, 2026
ba532cc
Polish
michaelstaib Feb 26, 2026
36d9d8d
Merge branch 'main' into mst/fusion-perf-th
michaelstaib Feb 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Collections.Immutable;
using System.Runtime.InteropServices;
using HotChocolate.Fusion.Properties;
using HotChocolate.Language;
using static HotChocolate.Fusion.Execution.Clients.SourceSchemaClientCapabilities;
using static HotChocolate.Fusion.Properties.FusionExecutionResources;
Expand All @@ -21,6 +20,11 @@ internal sealed class SourceSchemaRequestDispatcher
: ISourceSchemaScheduler
, ISourceSchemaDispatcher
{
private const int NodeStateUnregistered = -1;
private const int NodeStatePending = 0;
private const int NodeStateSubmitted = 1;
private const int NodeStateSkipped = 2;

#if NET9_0_OR_GREATER
private readonly Lock _sync = new();
#else
Expand Down Expand Up @@ -179,7 +183,7 @@ public void RegisterGroup(int groupId, IReadOnlyList<int> nodeIds)
}

_groupByNodeIdSlots[nodeId] = groupId;
_nodeStateSlots[nodeId] = 0;
_nodeStateSlots[nodeId] = NodeStatePending;
}
}
}
Expand Down Expand Up @@ -415,7 +419,7 @@ private void RemoveGroup(GroupState group)
if ((uint)nodeId < (uint)_groupByNodeIdSlots.Length)
{
_groupByNodeIdSlots[nodeId] = -1;
_nodeStateSlots[nodeId] = -1;
_nodeStateSlots[nodeId] = NodeStateUnregistered;
}
}
}
Expand All @@ -432,7 +436,7 @@ private void ClearNodeIdSlots()
if ((uint)nodeId < (uint)_groupByNodeIdSlots.Length)
{
_groupByNodeIdSlots[nodeId] = -1;
_nodeStateSlots[nodeId] = -1;
_nodeStateSlots[nodeId] = NodeStateUnregistered;
}
}

Expand All @@ -456,7 +460,7 @@ private void EnsureNodeIdSlotCapacity(int minCapacity)
var groupByNodeIdSlots = new int[newCapacity];
var nodeStateSlots = new int[newCapacity];
Array.Fill(groupByNodeIdSlots, -1);
Array.Fill(nodeStateSlots, -1);
Array.Fill(nodeStateSlots, NodeStateUnregistered);

if (_groupByNodeIdSlots.Length > 0)
{
Expand All @@ -471,15 +475,15 @@ private void EnsureNodeIdSlotCapacity(int minCapacity)
private sealed class GroupState(int id, int initialCapacity)
{
private readonly List<int> _nodeIds = new(initialCapacity);
private readonly Dictionary<int, PendingRequest> _pendingRequests = new(initialCapacity);
private readonly List<PendingRequest> _pendingRequests = new(initialCapacity);
private int _remainingNodes;
private bool _dispatchCreated;

public int Id { get; } = id;

public IEnumerable<int> NodeIds => _nodeIds;

public IEnumerable<PendingRequest> PendingRequests => _pendingRequests.Values;
public IEnumerable<PendingRequest> PendingRequests => _pendingRequests;

public void RegisterNode(int nodeId)
{
Expand All @@ -493,35 +497,40 @@ public bool TrySubmit(
out PendingRequest? pendingRequest)
{
var nodeId = request.Node.Id;
var nodeState =
(uint)nodeId < (uint)nodeStateSlots.Length
? nodeStateSlots[nodeId]
: NodeStateUnregistered;

if (_pendingRequests.ContainsKey(nodeId))
if (nodeState == NodeStateSubmitted)
{
throw new InvalidOperationException(
string.Format(
SourceSchemaRequestDispatcher_DuplicateNodeSubmission,
nodeId));
}

if ((uint)nodeId >= (uint)nodeStateSlots.Length || nodeStateSlots[nodeId] != 0)
if (nodeState != NodeStatePending)
{
pendingRequest = null;
return false;
}

nodeStateSlots[nodeId] = 1;
nodeStateSlots[nodeId] = NodeStateSubmitted;
_remainingNodes--;

pendingRequest = new PendingRequest(request);
_pendingRequests.Add(nodeId, pendingRequest);
_pendingRequests.Add(pendingRequest);

return true;
}

public void Skip(int nodeId, int[] nodeStateSlots)
{
if ((uint)nodeId < (uint)nodeStateSlots.Length && nodeStateSlots[nodeId] == 0)
if ((uint)nodeId < (uint)nodeStateSlots.Length
&& nodeStateSlots[nodeId] == NodeStatePending)
{
nodeStateSlots[nodeId] = 1;
nodeStateSlots[nodeId] = NodeStateSkipped;
_remainingNodes--;
}
}
Expand All @@ -542,7 +551,7 @@ public bool TryCreateDispatch(out ImmutableArray<PendingRequest> pendingRequests
return true;
}

pendingRequests = [.. _pendingRequests.Values];
pendingRequests = [.. _pendingRequests];
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Buffers;
using System.Collections.Immutable;
using System.Runtime.InteropServices;
using HotChocolate.Fusion.Execution.Clients;

namespace HotChocolate.Fusion.Execution.Nodes;
Expand Down Expand Up @@ -127,6 +128,7 @@ protected override async ValueTask<ExecutionStatus> OnExecuteAsync(
var index = 0;
var bufferLength = 0;
SourceSchemaResult[]? buffer = null;
SourceSchemaResult? singleResult = null;
var hasSomeErrors = false;

try
Expand All @@ -145,14 +147,34 @@ protected override async ValueTask<ExecutionStatus> OnExecuteAsync(
totalPathCount += variables[i].AdditionalPaths.Length;
}

bufferLength = Math.Max(totalPathCount, 1);
buffer = ArrayPool<SourceSchemaResult>.Shared.Rent(bufferLength);
var initialBufferLength = Math.Max(totalPathCount, 2);

await foreach (var result in response.ReadAsResultStreamAsync(cancellationToken))
{
buffer[index++] = result;
// Store the first result without renting a buffer,
// since it might be the only one (e.g. a request-level error).
if (index == 0)
{
singleResult = result;
index = 1;
}
else
{
// Once we see a second result, we know there are multiple,
// so we rent a buffer and move the first result into it.
if (buffer is null)
{
bufferLength = initialBufferLength;
buffer = ArrayPool<SourceSchemaResult>.Shared.Rent(bufferLength);
buffer[0] = singleResult!;
}

if (result.HasErrors)
buffer[index++] = result;
}

// Parsing errors here allows the result store to reuse the cached value
// and avoids a second document lookup per result.
if (result.Errors is not null)
{
hasSomeErrors = true;
}
Expand Down Expand Up @@ -182,14 +204,42 @@ protected override async ValueTask<ExecutionStatus> OnExecuteAsync(
buffer.AsSpan(0, index).Clear();
ArrayPool<SourceSchemaResult>.Shared.Return(buffer);
}
else if (singleResult is not null)
{
singleResult.Dispose();
}

AddErrors(context, exception, variables, _responseNames);
return ExecutionStatus.Failed;
}

try
{
context.AddPartialResults(_source, buffer.AsSpan(0, index), _responseNames);
if (buffer is not null)
{
context.AddPartialResults(
_source,
buffer.AsSpan(0, index),
_responseNames,
hasSomeErrors);
}
else if (singleResult is not null)
{
var firstResult = singleResult;
context.AddPartialResults(
_source,
MemoryMarshal.CreateReadOnlySpan(ref firstResult, 1),
_responseNames,
hasSomeErrors);
}
else
{
context.AddPartialResults(
_source,
[],
_responseNames,
hasSomeErrors);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Expand All @@ -206,8 +256,11 @@ protected override async ValueTask<ExecutionStatus> OnExecuteAsync(
}
finally
{
buffer.AsSpan(0, index).Clear();
ArrayPool<SourceSchemaResult>.Shared.Return(buffer);
if (buffer is not null)
{
buffer.AsSpan(0, index).Clear();
ArrayPool<SourceSchemaResult>.Shared.Return(buffer);
}
}

return hasSomeErrors ? ExecutionStatus.PartialSuccess : ExecutionStatus.Success;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ protected override async ValueTask<ExecutionStatus> OnExecuteAsync(
var index = 0;
var bufferLength = 0;
SourceSchemaResult[]? buffer = null;
SourceSchemaResult? singleResult = null;
var hasSomeErrors = false;

try
Expand All @@ -152,14 +153,32 @@ protected override async ValueTask<ExecutionStatus> OnExecuteAsync(
totalPathCount += variables[i].AdditionalPaths.Length;
}

bufferLength = Math.Max(totalPathCount, 1);
buffer = ArrayPool<SourceSchemaResult>.Shared.Rent(bufferLength);
var initialBufferLength = Math.Max(totalPathCount, 2);

await foreach (var result in response.ReadAsResultStreamAsync(cancellationToken))
{
buffer[index++] = result;
// If there is only one response, we skip the buffer rental.
if (index == 0)
{
singleResult = result;
index = 1;
}
else
{
// If we have more than one response, we rent a buffer and move the first result into it.
if (buffer is null)
{
bufferLength = initialBufferLength;
buffer = ArrayPool<SourceSchemaResult>.Shared.Rent(bufferLength);
buffer[0] = singleResult!;
}

if (result.HasErrors)
buffer[index++] = result;
}

// Parsing errors here allows the result store to reuse the cached value
// and avoids a second document lookup per result.
if (result.Errors is not null)
{
hasSomeErrors = true;
}
Expand Down Expand Up @@ -189,14 +208,42 @@ protected override async ValueTask<ExecutionStatus> OnExecuteAsync(
buffer.AsSpan(0, index).Clear();
ArrayPool<SourceSchemaResult>.Shared.Return(buffer);
}
else if (singleResult is not null)
{
singleResult.Dispose();
}

AddErrors(context, exception, variables, _responseNames);
return ExecutionStatus.Failed;
}

try
{
context.AddPartialResults(_source, buffer.AsSpan(0, index), _responseNames);
if (buffer is not null)
{
context.AddPartialResults(
_source,
buffer.AsSpan(0, index),
_responseNames,
hasSomeErrors);
}
else if (singleResult is not null)
{
var firstResult = singleResult;
context.AddPartialResults(
_source,
MemoryMarshal.CreateReadOnlySpan(ref firstResult, 1),
_responseNames,
hasSomeErrors);
}
else
{
context.AddPartialResults(
_source,
[],
_responseNames,
hasSomeErrors);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Expand All @@ -213,8 +260,11 @@ protected override async ValueTask<ExecutionStatus> OnExecuteAsync(
}
finally
{
buffer.AsSpan(0, index).Clear();
ArrayPool<SourceSchemaResult>.Shared.Return(buffer);
if (buffer is not null)
{
buffer.AsSpan(0, index).Clear();
ArrayPool<SourceSchemaResult>.Shared.Return(buffer);
}
}

return hasSomeErrors ? ExecutionStatus.PartialSuccess : ExecutionStatus.Success;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,9 +260,11 @@ internal ImmutableArray<VariableValues> CreateVariableValueSets(
internal void AddPartialResults(
SelectionPath sourcePath,
ReadOnlySpan<SourceSchemaResult> results,
ReadOnlySpan<string> responseNames)
ReadOnlySpan<string> responseNames,
bool containsErrors = true)
{
var canExecutionContinue = _resultStore.AddPartialResults(sourcePath, results, responseNames);
var canExecutionContinue =
_resultStore.AddPartialResults(sourcePath, results, responseNames, containsErrors);

if (!canExecutionContinue)
{
Expand Down Expand Up @@ -369,15 +371,15 @@ internal OperationResult Complete(bool reusable = false)
return operationResult;
}

private List<ObjectFieldNode> GetPathThroughVariables(
private IReadOnlyList<ObjectFieldNode> GetPathThroughVariables(
ReadOnlySpan<string> forwardedVariables)
{
if (Variables.IsEmpty || forwardedVariables.Length == 0)
{
return [];
return Array.Empty<ObjectFieldNode>();
}

var variables = new List<ObjectFieldNode>();
var variables = new List<ObjectFieldNode>(forwardedVariables.Length);

foreach (var variableName in forwardedVariables)
{
Expand All @@ -400,7 +402,9 @@ private List<ObjectFieldNode> GetPathThroughVariables(
}
}

return variables;
return variables.Count == 0
? Array.Empty<ObjectFieldNode>()
: variables;
}

public ISourceSchemaClient GetClient(string schemaName, OperationType operationType)
Expand Down
Loading
Loading