Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Concurrent;
using HotChocolate.Caching.Memory;
using HotChocolate.Execution;
using HotChocolate.Fusion.Diagnostics;
Expand All @@ -10,6 +11,8 @@ internal sealed class OperationPlanCacheMiddleware
{
private readonly Cache<OperationPlan> _cache;
private readonly IFusionExecutionDiagnosticEvents _diagnosticEvents;
private readonly ConcurrentDictionary<string, Lazy<TaskCompletionSource<OperationPlan>>> _inFlightPlans =
new(StringComparer.Ordinal);

private OperationPlanCacheMiddleware(Cache<OperationPlan> cache, IFusionExecutionDiagnosticEvents diagnosticEvents)
{
Expand All @@ -32,29 +35,95 @@ public async ValueTask InvokeAsync(RequestContext context, RequestDelegate next)
: $"{documentInfo.Hash.Value}.{context.Request.OperationName ?? "Default"}";
context.SetOperationId(operationId);

var isPlanCached = false;
var isSingleFlightLeader = false;
Lazy<TaskCompletionSource<OperationPlan>>? inFlightPlan = null;

if (_cache.TryGet(operationId, out var plan))
{
context.SetOperationPlan(plan);
isPlanCached = true;
_diagnosticEvents.RetrievedOperationPlanFromCache(context, operationId);
}
else if (_inFlightPlans.TryGetValue(operationId, out inFlightPlan))
{
// Another request is already planning this operation.
// Await the leader's result to avoid redundant planning work.
var coalescedPlan = await inFlightPlan.Value.Task
.WaitAsync(context.RequestAborted)
.ConfigureAwait(false);
context.SetOperationPlan(coalescedPlan);
}
else
{
// No plan is cached and no planning is in progress.
// Use a Lazy<TCS> so that under burst conditions only one TCS is materialized
// even if multiple requests race through GetOrAdd concurrently.
inFlightPlan = new Lazy<TaskCompletionSource<OperationPlan>>(
static () => new TaskCompletionSource<OperationPlan>(
TaskCreationOptions.RunContinuationsAsynchronously));
var cachedInFlightPlan = _inFlightPlans.GetOrAdd(operationId, inFlightPlan);

await next(context);
if (ReferenceEquals(cachedInFlightPlan, inFlightPlan))
{
// We won the race! This request is the single-flight leader
// responsible for planning and signaling all followers.
isSingleFlightLeader = true;
context.Features.Set(inFlightPlan.Value);
}
else
{
// We lost the race! Another request claimed leadership between
// TryGetValue and GetOrAdd. So we simply await the leader's result.
var coalescedPlan = await cachedInFlightPlan.Value.Task
.WaitAsync(context.RequestAborted)
.ConfigureAwait(false);
context.SetOperationPlan(coalescedPlan);
}
}

if (!isPlanCached)
try
{
await next(context);
}
catch (Exception ex)
{
// We retrieve the execution plan from the context.
// If there is no execution plan, we can exit early as something must have
// gone wrong in the pipeline. If we get, however, an execution plan,
// we try to cache it.
var executionPlan = context.GetOperationPlan();
// Propagate the exception to all waiting followers.
if (isSingleFlightLeader && inFlightPlan is not null)
{
inFlightPlan.Value.TrySetException(ex);
}

if (executionPlan is not null)
throw;
}
finally
{
if (isSingleFlightLeader)
{
_cache.TryAdd(operationId, executionPlan);
_diagnosticEvents.AddedOperationPlanToCache(context, operationId);
// Guard against a faulty diagnostic event handler preventing cleanup.
// Without this, a throw from the cache or diagnostics would leak the
// in-flight entry, causing _inFlightPlans to grow indefinitely.
try
{
if (context.GetOperationPlan() is { } operationPlan)
{
// Cache the plan before removing the in-flight entry so that
// there is no window where the plan is in neither structure.
_cache.TryAdd(operationId, operationPlan);
_diagnosticEvents.AddedOperationPlanToCache(context, operationId);
inFlightPlan?.Value.TrySetResult(operationPlan);
}
else if (inFlightPlan?.Value.Task.IsCompleted == false)
{
// The pipeline completed without producing a plan and without
// throwing. Signal followers so they do not hang indefinitely.
inFlightPlan.Value.TrySetException(
new InvalidOperationException(
"The operation plan task completed without a result."));
}
}
finally
{
_inFlightPlans.TryRemove(operationId, out _);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private void PlanOperation(
var operationShortHash = operationHash[..8];

using var scope = _diagnosticsEvents.PlanOperation(context, operationId);
var inFlightPlan = context.Features.Get<TaskCompletionSource<OperationPlan>>();

try
{
Expand All @@ -74,9 +75,11 @@ private void PlanOperation(
context.RequestAborted);
OnAfterPlanCompleted(operationDocumentInfo, operationPlan);
context.SetOperationPlan(operationPlan);
inFlightPlan?.TrySetResult(operationPlan);
}
catch (Exception ex)
{
inFlightPlan?.TrySetException(ex);
_diagnosticsEvents.PlanOperationError(context, operationId, ex);

throw;
Expand Down
Loading
Loading