Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Collections.Concurrent;
using HotChocolate.Execution.Caching;
using HotChocolate.Execution.Instrumentation;
using HotChocolate.Execution.Processing;

namespace HotChocolate.Execution.Pipeline;

Expand All @@ -8,6 +10,8 @@ internal sealed class OperationCacheMiddleware
private readonly RequestDelegate _next;
private readonly IExecutionDiagnosticEvents _diagnosticEvents;
private readonly IPreparedOperationCache _operationCache;
private readonly ConcurrentDictionary<string, Lazy<TaskCompletionSource<Operation>>> _inFlightOperations =
new(StringComparer.Ordinal);

private OperationCacheMiddleware(
RequestDelegate next,
Expand All @@ -30,29 +34,104 @@ public async ValueTask InvokeAsync(RequestContext context)
if (documentId.IsEmpty)
{
await _next(context).ConfigureAwait(false);
return;
}

if (!context.TryGetOperationId(out var operationId))
{
operationId = context.CreateCacheId();
context.SetOperationId(operationId);
}

var isSingleFlightLeader = false;
Lazy<TaskCompletionSource<Operation>>? inFlightOperation = null;

if (_operationCache.TryGetOperation(operationId, out var operation))
{
context.SetOperation(operation);
_diagnosticEvents.RetrievedOperationFromCache(context);
}
else if (_inFlightOperations.TryGetValue(operationId, out inFlightOperation))
{
// Another request is already compiling this operation.
// Await the leader's result to avoid redundant compilation work.
var coalescedOperation = await inFlightOperation.Value.Task
.WaitAsync(context.RequestAborted)
.ConfigureAwait(false);
context.SetOperation(coalescedOperation);
}
else
{
var addToCache = true;
if (!context.TryGetOperationId(out var operationId))
// No operation is cached and no compilation is in progress.
// Use a Lazy<TCS> so that under burst conditions only one TCS is materialized
// even if multiple requests race through GetOrAdd concurrently.
inFlightOperation = new Lazy<TaskCompletionSource<Operation>>(
static () => new TaskCompletionSource<Operation>(
TaskCreationOptions.RunContinuationsAsynchronously));
var cachedInFlightOperation = _inFlightOperations.GetOrAdd(operationId, inFlightOperation);

if (ReferenceEquals(cachedInFlightOperation, inFlightOperation))
{
operationId = context.CreateCacheId();
context.SetOperationId(operationId);
// We won the race! This request is the single-flight leader
// responsible for compiling and signaling all followers.
isSingleFlightLeader = true;
context.Features.Set(inFlightOperation.Value);
}

if (_operationCache.TryGetOperation(operationId, out var operation))
else
{
context.SetOperation(operation);
addToCache = false;
_diagnosticEvents.RetrievedOperationFromCache(context);
// We lost the race! Another request claimed leadership between
// TryGetValue and GetOrAdd. So we simply await the leader's result.
var coalescedOperation = await cachedInFlightOperation.Value.Task
.WaitAsync(context.RequestAborted)
.ConfigureAwait(false);
context.SetOperation(coalescedOperation);
}
}

try
{
await _next(context).ConfigureAwait(false);
}
catch (Exception ex)
{
// Propagate the exception to all waiting followers.
if (isSingleFlightLeader && inFlightOperation is not null)
{
inFlightOperation.Value.TrySetException(ex);
}

if (addToCache && context.TryGetOperation(out operation))
throw;
}
finally
{
if (isSingleFlightLeader)
{
_operationCache.TryAddOperation(operation.Id, operation);
_diagnosticEvents.AddedOperationToCache(context);
// Guard against a faulty diagnostic event handler preventing cleanup.
// Without this, a throw from the cache or diagnostics would leak the
// in-flight entry, causing _inFlightOperations to grow indefinitely.
try
{
if (context.TryGetOperation(out operation))
{
// Cache the operation before removing the in-flight entry so that
// there is no window where the operation is in neither structure.
_operationCache.TryAddOperation(operation.Id, operation);
_diagnosticEvents.AddedOperationToCache(context);
inFlightOperation?.Value.TrySetResult(operation);
}
else if (inFlightOperation?.Value.Task.IsCompleted == false)
{
// The pipeline completed without producing an operation and without
// throwing. Signal followers so they do not hang indefinitely.
inFlightOperation.Value.TrySetException(
new InvalidOperationException(
"The operation compilation task completed without a result."));
}
}
finally
{
_inFlightOperations.TryRemove(operationId, out _);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,27 @@ public async ValueTask InvokeAsync(RequestContext context)
var documentInfo = context.OperationDocumentInfo;
if (documentInfo.Document is not null && documentInfo.IsValidated)
{
var inFlightOperation = context.Features.Get<TaskCompletionSource<Operation>>();

using (_diagnosticEvents.CompileOperation(context))
{
operation = _operationPlanner.Compile(
operationId ?? Guid.NewGuid().ToString("N"),
documentInfo.Hash.Value,
context.Request.OperationName,
documentInfo.Document,
context);
try
{
operation = _operationPlanner.Compile(
operationId ?? Guid.NewGuid().ToString("N"),
documentInfo.Hash.Value,
context.Request.OperationName,
documentInfo.Document,
context);

context.SetOperation(operation);
context.SetOperation(operation);
inFlightOperation?.TrySetResult(operation);
}
catch (Exception ex)
{
inFlightOperation?.TrySetException(ex);
throw;
}
}

await _next(context).ConfigureAwait(false);
Expand Down
Loading
Loading