Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
0f2b546
Optimize Fusion execution batching group registration
michaelstaib Feb 25, 2026
7f5c2d3
Merge branch 'main' into perf/fusion-execution-batching-cache
michaelstaib Feb 25, 2026
64ac66b
Optimize execution scheduling with ready dependency queue
michaelstaib Feb 25, 2026
47f180c
Use hash set backlog for constant-time scheduler removals
michaelstaib Feb 25, 2026
5226b21
Use node-id slots for dependency tracking in execution state
michaelstaib Feb 25, 2026
45a6a0e
Optimize batched result fanout and dispatcher hot paths
michaelstaib Feb 25, 2026
6fa4c0d
Optimize variable-set materialization in fetch result store
michaelstaib Feb 25, 2026
dd5a2f7
Reduce fetch result store merge allocations
michaelstaib Feb 25, 2026
806c8c0
Skip unnecessary ready-node sorting in execution state
michaelstaib Feb 25, 2026
6ca6f79
Precompute fetch merge inputs outside result lock
michaelstaib Feb 25, 2026
1e35472
Use node-id array lookup in operation plan
michaelstaib Feb 25, 2026
f20a5d6
Merge branch 'main' into perf/fusion-execution-batching-cache
michaelstaib Feb 25, 2026
dc1873b
Simplify dispatcher batching group membership tracking
michaelstaib Feb 25, 2026
8c74aaa
Optimize dispatcher node-group lookup and target element buffers
michaelstaib Feb 25, 2026
601fef3
Add fast paths for multi-requirement variable deduping
michaelstaib Feb 25, 2026
ba9cca9
Track dispatcher group progress with node state slots
michaelstaib Feb 25, 2026
063e74c
Optimize execution state node bookkeeping
michaelstaib Feb 25, 2026
3e3f2d4
Streamline scheduling of ready execution nodes
michaelstaib Feb 25, 2026
e008909
Merge branch 'main' into perf/fusion-execution-batching-cache
michaelstaib Feb 25, 2026
9ca2223
Remove forced async continuations for pending dispatcher requests
michaelstaib Feb 25, 2026
5f028ae
Revert "Remove forced async continuations for pending dispatcher requ…
michaelstaib Feb 25, 2026
867a871
Revert "Streamline scheduling of ready execution nodes"
michaelstaib Feb 25, 2026
dd90067
Reduce execution state backlog setup overhead
michaelstaib Feb 25, 2026
43a54b6
Optimize leaf value mapping in result data mapper
michaelstaib Feb 25, 2026
304958c
Speed up simple requirement extraction and leaf mapping
michaelstaib Feb 25, 2026
0cb5242
polish
michaelstaib Feb 25, 2026
7744749
polish
michaelstaib Feb 25, 2026
02bb8bc
polish
michaelstaib Feb 25, 2026
85b2527
polish
michaelstaib Feb 25, 2026
11e0178
Merge branch 'main' into perf/fusion-execution-batching-cache
michaelstaib Feb 25, 2026
c6b5dc8
Fix Fusion test regressions in requirement dedupe
michaelstaib Feb 25, 2026
7ccfb8e
Merge branch 'main' into perf/fusion-execution-batching-cache
michaelstaib Feb 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,20 @@ public async ValueTask<ImmutableArray<SourceSchemaClientResponse>> ExecuteBatchA
var contentType = httpResponse.ContentHeaders.ContentType?.ToString() ?? "unknown";
var isSuccessful = httpResponse.IsSuccessStatusCode;

var nodeResponsesByNodeId = new Dictionary<int, NodeResponse>(requests.Length);
var nodeResponses = new NodeResponse[requests.Length];
var builder = ImmutableArray.CreateBuilder<SourceSchemaClientResponse>(requests.Length);

for (var i = 0; i < requests.Length; i++)
{
var nodeResponse = new NodeResponse(uri, contentType, isSuccessful);
nodeResponsesByNodeId[requests[i].Node.Id] = nodeResponse;
nodeResponses[i] = nodeResponse;
builder.Add(nodeResponse);
}

_ = ReadBatchStreamInBackgroundAsync(
context,
requests,
nodeResponsesByNodeId,
nodeResponses,
httpResponse,
cancellationToken);

Expand Down Expand Up @@ -290,7 +290,7 @@ private static VariableBatchRequest CreateVariableBatchRequest(
private async Task ReadBatchStreamInBackgroundAsync(
OperationPlanContext context,
ImmutableArray<SourceSchemaClientRequest> requests,
Dictionary<int, NodeResponse> nodeResponses,
NodeResponse[] nodeResponses,
GraphQLHttpResponse httpResponse,
CancellationToken cancellationToken)
{
Expand All @@ -311,15 +311,7 @@ private async Task ReadBatchStreamInBackgroundAsync(
}

var request = requests[requestIndex];

if (!nodeResponses.TryGetValue(request.Node.Id, out var nodeResponse))
{
result.Dispose();
throw new InvalidOperationException(
string.Format(
FusionExecutionResources.SourceSchemaHttpClient_NoResponseChannelForNode,
request.Node.Id));
}
var nodeResponse = nodeResponses[requestIndex];

var variableIndex = ResolveVariableIndex(request, result);

Expand All @@ -338,15 +330,17 @@ private async Task ReadBatchStreamInBackgroundAsync(

// Stream completed successfully. Complete all channels, failing any
// that never received results (fail-loud).
foreach (var (nodeId, nodeResponse) in nodeResponses)
for (var i = 0; i < nodeResponses.Length; i++)
{
var nodeResponse = nodeResponses[i];

if (!nodeResponse.HasReceivedResults)
{
nodeResponse.Complete(
new InvalidOperationException(
string.Format(
FusionExecutionResources.SourceSchemaHttpClient_NoResultForNode,
nodeId)));
requests[i].Node.Id)));
}
else
{
Expand All @@ -356,9 +350,9 @@ private async Task ReadBatchStreamInBackgroundAsync(
}
catch (Exception ex)
{
foreach (var nodeResponse in nodeResponses.Values)
for (var i = 0; i < nodeResponses.Length; i++)
{
nodeResponse.Complete(ex);
nodeResponses[i].Complete(ex);
}
}
finally
Expand Down Expand Up @@ -499,8 +493,12 @@ private void WriteResultToChannel(
ImmutableArray<Path> additionalPaths,
SourceResultDocument document)
{
var sourceSchemaResult = new SourceSchemaResult(path, document);
_configuration.OnSourceSchemaResult?.Invoke(context, node, sourceSchemaResult);
var sourceSchemaResult = additionalPaths.IsDefaultOrEmpty
? new SourceSchemaResult(path, document)
: new SourceSchemaResult(path, document, additionalPaths: additionalPaths);
var onSourceSchemaResult = _configuration.OnSourceSchemaResult;

onSourceSchemaResult?.Invoke(context, node, sourceSchemaResult);

if (!nodeResponse.TryWrite(sourceSchemaResult))
{
Expand All @@ -510,17 +508,15 @@ private void WriteResultToChannel(

nodeResponse.HasReceivedResults = true;

foreach (var additionalPath in additionalPaths)
if (onSourceSchemaResult is null || additionalPaths.IsDefaultOrEmpty)
{
var alias = sourceSchemaResult.WithPath(additionalPath);
_configuration.OnSourceSchemaResult?.Invoke(context, node, alias);
return;
}

if (!nodeResponse.TryWrite(alias))
{
// alias does not own the document (ownsDocument: false via WithPath),
// so no disposal needed for the alias itself.
return;
}
// Preserve callback behavior for all logical result paths without enqueueing aliases.
foreach (var additionalPath in additionalPaths)
{
onSourceSchemaResult(context, node, sourceSchemaResult.WithPath(additionalPath));
}
}

Expand Down
Loading
Loading