Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ public static IApplicationBuilder MapGraphQL(

applicationBuilder
.Use(MiddlewareFactory.CreateCancellationMiddleware())
.Use(MiddlewareFactory.CreateConcurrencyGateMiddleware(serverOptions.MaxConcurrentRequests))
.Use(MiddlewareFactory.CreateWebSocketSubscriptionMiddleware(executor, serverOptions))
.Use(MiddlewareFactory.CreateHttpPostMiddleware(executor, serverOptions))
.Use(MiddlewareFactory.CreateHttpMultipartMiddleware(executor, serverOptions, formOptions))
Expand Down Expand Up @@ -216,7 +215,6 @@ public static GraphQLHttpEndpointConventionBuilder MapGraphQLHttp(

requestPipeline
.Use(MiddlewareFactory.CreateCancellationMiddleware())
.Use(MiddlewareFactory.CreateConcurrencyGateMiddleware(serverOptions.MaxConcurrentRequests))
.Use(MiddlewareFactory.CreateHttpPostMiddleware(executor, serverOptions))
.Use(MiddlewareFactory.CreateHttpMultipartMiddleware(executor, serverOptions, formOptions))
.Use(MiddlewareFactory.CreateHttpGetMiddleware(executor, serverOptions))
Expand Down Expand Up @@ -377,7 +375,6 @@ public static IEndpointConventionBuilder MapGraphQLSchema(

requestPipeline
.Use(MiddlewareFactory.CreateCancellationMiddleware())
.Use(MiddlewareFactory.CreateConcurrencyGateMiddleware(serverOptions.MaxConcurrentRequests))
.Use(MiddlewareFactory.CreateHttpGetSchemaMiddleware(
executor, serverOptions, PathString.Empty, MiddlewareRoutingType.Explicit))
.Use(_ => context =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,6 @@ internal static Func<RequestDelegate, RequestDelegate> CreateCancellationMiddlew
};
}

internal static Func<RequestDelegate, RequestDelegate> CreateConcurrencyGateMiddleware(
int? maxConcurrentRequests)
{
if (maxConcurrentRequests is null or <= 0)
{
return next => next;
}

var semaphore = new SemaphoreSlim(maxConcurrentRequests.Value, maxConcurrentRequests.Value);

return next => async context =>
{
if (context.WebSockets.IsWebSocketRequest)
{
await next(context);
return;
}

await semaphore.WaitAsync(context.RequestAborted);

try
{
await next(context);
}
finally
{
semaphore.Release();
}
};
}

internal static Func<RequestDelegate, RequestDelegate> CreateWebSocketSubscriptionMiddleware(
HttpRequestExecutorProxy executor,
GraphQLServerOptions serverOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ public sealed class GraphQLServerOptions
public int MaxBatchSize { get; set; } = 1024;

/// <summary>
/// Gets or sets the maximum number of concurrent GraphQL requests that can be
/// Gets or sets the maximum number of concurrent GraphQL executions that can be
/// processed simultaneously. A value of <c>null</c> means unlimited. Defaults to 64.
/// </summary>
public int? MaxConcurrentRequests { get; set; } = 64;
public int? MaxConcurrentExecutions { get; set; } = 64;

internal GraphQLServerOptions Clone()
=> new()
Expand All @@ -88,6 +88,6 @@ internal GraphQLServerOptions Clone()
EnableSchemaRequests = EnableSchemaRequests,
Batching = Batching,
MaxBatchSize = MaxBatchSize,
MaxConcurrentRequests = MaxConcurrentRequests
MaxConcurrentExecutions = MaxConcurrentExecutions
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ private static IRequestExecutorBuilder AddGraphQLServerCore(
this IRequestExecutorBuilder builder,
int maxAllowedRequestSize = MaxAllowedRequestSize)
{
builder.ConfigureSchemaServices(s =>
builder.ConfigureSchemaServices((applicationServices, s) =>
{
s.TryAddSingleton(sp =>
{
Expand Down Expand Up @@ -154,6 +154,15 @@ private static IRequestExecutorBuilder AddGraphQLServerCore(
_ => new AggregateServerDiagnosticEventListener(listeners)
};
});

s.TryAddSingleton(schemaServices =>
{
var schemaName = schemaServices.GetRequiredService<ISchemaDefinition>().Name;
var serverOptions = applicationServices
.GetRequiredService<IOptionsMonitor<GraphQLServerOptions>>()
.Get(schemaName);
return new ExecutionConcurrencyGate(serverOptions.MaxConcurrentExecutions);
});
});

builder.Services.TryAddEnumerable(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
namespace HotChocolate.Execution;

/// <summary>
/// Limits the number of GraphQL executions that can run simultaneously against a
/// single <see cref="IRequestExecutor"/>. The gate is shared by the request pipeline
/// (for queries, mutations, and subscription handshakes) and the subscription event
/// loop (for each inbound event), so that one in-flight execution equals one slot
/// regardless of transport.
/// </summary>
public sealed class ExecutionConcurrencyGate
{
private readonly SemaphoreSlim? _semaphore;

/// <summary>
/// Initializes a new instance of <see cref="ExecutionConcurrencyGate"/>.
/// </summary>
/// <param name="maxConcurrentExecutions">
/// The maximum number of concurrent executions allowed. A value of <c>null</c>
/// or <c>&lt;= 0</c> disables the gate.
/// </param>
public ExecutionConcurrencyGate(int? maxConcurrentExecutions)
{
if (maxConcurrentExecutions is { } max and > 0)
{
_semaphore = new SemaphoreSlim(max, max);
}
}

/// <summary>
/// Gets a value indicating whether the gate is enabled.
/// </summary>
public bool IsEnabled => _semaphore is not null;

/// <summary>
/// Asynchronously waits for a slot to become available.
/// </summary>
/// <param name="cancellationToken">
/// The cancellation token to observe while waiting.
/// </param>
public ValueTask WaitAsync(CancellationToken cancellationToken)
{
if (_semaphore is null)
{
return ValueTask.CompletedTask;
}

return new ValueTask(_semaphore.WaitAsync(cancellationToken));
}

/// <summary>
/// Releases a previously acquired slot.
/// </summary>
public void Release() => _semaphore?.Release();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public static class WellKnownRequestMiddleware
/// <summary>Gets the key for the OperationCacheMiddleware.</summary>
public const string OperationCacheMiddleware = "OperationCacheMiddleware";

/// <summary>Gets the key for the ConcurrencyGateMiddleware.</summary>
public const string ConcurrencyGateMiddleware = "ConcurrencyGateMiddleware";

/// <summary>Gets the key for the OperationExecutionMiddleware.</summary>
public const string OperationExecutionMiddleware = "OperationExecutionMiddleware";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ public static RequestMiddlewareConfiguration DocumentValidation
/// </summary>
public static RequestMiddlewareConfiguration SkipWarmupExecution
=> SkipWarmupExecutionMiddleware.Create();

/// <summary>
/// Gets the middleware configuration for limiting the number of concurrent
/// GraphQL executions against the current schema.
/// </summary>
public static RequestMiddlewareConfiguration ConcurrencyGate
=> ConcurrencyGateMiddleware.Create();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using Microsoft.Extensions.DependencyInjection;

namespace HotChocolate.Execution.Pipeline;

internal sealed class ConcurrencyGateMiddleware
{
private readonly RequestDelegate _next;
private readonly ExecutionConcurrencyGate _gate;

private ConcurrencyGateMiddleware(
RequestDelegate next,
ExecutionConcurrencyGate gate)
{
ArgumentNullException.ThrowIfNull(next);
ArgumentNullException.ThrowIfNull(gate);

_next = next;
_gate = gate;
}

public async ValueTask InvokeAsync(RequestContext context)
{
// Delegates to the schema-wide execution concurrency gate; see
// ExecutionConcurrencyGate for ordering and timeout semantics.
await _gate.WaitAsync(context.RequestAborted).ConfigureAwait(false);

try
{
await _next(context).ConfigureAwait(false);
}
finally
{
_gate.Release();
}
}

public static RequestMiddlewareConfiguration Create()
=> new RequestMiddlewareConfiguration(
(factoryContext, next) =>
{
var gate = factoryContext.SchemaServices.GetService<ExecutionConcurrencyGate>();

if (gate is null or { IsEnabled: false })
{
// No gate is configured — skip the middleware entirely so there is
// no per-request overhead on the hot path.
return context => next(context);
}

var middleware = new ConcurrencyGateMiddleware(next, gate);
return context => middleware.InvokeAsync(context);
},
WellKnownRequestMiddleware.ConcurrencyGateMiddleware);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using HotChocolate.Execution;
using HotChocolate.Execution.Instrumentation;
using HotChocolate.Execution.Options;
using HotChocolate.Execution.Processing;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.ObjectPool;
Expand All @@ -21,7 +22,9 @@ internal static IServiceCollection TryAddOperationExecutors(
sp.GetRootServiceProvider().GetRequiredService<ObjectPool<OperationContext>>(),
sp.GetRequiredService<QueryExecutor>(),
sp.GetRequiredService<IErrorHandler>(),
sp.GetRequiredService<IExecutionDiagnosticEvents>()));
sp.GetRequiredService<IExecutionDiagnosticEvents>(),
sp.GetService<ExecutionConcurrencyGate>(),
sp.GetRequiredService<IRequestExecutorOptionsAccessor>()));
return services;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,26 @@ public static IRequestExecutorBuilder UseOperationCache(
return builder.UseRequest(OperationCacheMiddleware.Create());
}

/// <summary>
/// Adds a middleware that will be used to limit the number of concurrent
/// GraphQL executions against the current schema. The gate counts every
/// execution (queries, mutations, subscription handshakes, and per-event
/// subscription executions) as a single slot.
/// </summary>
/// <param name="builder">
/// The <see cref="IRequestExecutorBuilder"/> that can be used to configure a schema and its execution.
/// </param>
/// <returns>
/// An <see cref="IRequestExecutorBuilder"/> that can be used to configure a schema and its execution.
/// </returns>
public static IRequestExecutorBuilder UseConcurrencyGate(
this IRequestExecutorBuilder builder)
{
ArgumentNullException.ThrowIfNull(builder);

return builder.UseRequest(CommonMiddleware.ConcurrencyGate);
}

/// <summary>
/// Adds a middleware that will be used to execute the operation.
/// </summary>
Expand Down Expand Up @@ -622,6 +642,7 @@ public static IRequestExecutorBuilder UsePersistedOperationPipeline(
.UseOperationResolver()
.UseSkipWarmupExecution()
.UseOperationVariableCoercion()
.UseConcurrencyGate()
.UseOperationExecution();
}

Expand All @@ -644,6 +665,7 @@ public static IRequestExecutorBuilder UseAutomaticPersistedOperationPipeline(
.UseOperationResolver()
.UseSkipWarmupExecution()
.UseOperationVariableCoercion()
.UseConcurrencyGate()
.UseOperationExecution();
}

Expand All @@ -659,6 +681,7 @@ internal static void AddDefaultPipeline(this IList<RequestMiddlewareConfiguratio
pipeline.Add(OperationResolverMiddleware.Create());
pipeline.Add(CommonMiddleware.SkipWarmupExecution);
pipeline.Add(OperationVariableCoercionMiddleware.Create());
pipeline.Add(CommonMiddleware.ConcurrencyGate);
pipeline.Add(OperationExecutionMiddleware.Create());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,18 @@ public void Initialize(
IVariableValueCollection variables,
object? rootValue,
Func<object?> resolveQueryRootValue,
int variableIndex = -1)
int variableIndex = -1,
CancellationToken requestAbortedOverride = default)
{
_requestContext = requestContext;
_schema = Unsafe.As<Schema>(requestContext.Schema);
_errorHandler = _schema.Services.GetRequiredService<IErrorHandler>();
_resolvers = scopedServices.GetRequiredService<ResolverProvider>();
_diagnosticEvents = _schema.Services.GetRequiredService<IExecutionDiagnosticEvents>();
_contextData = requestContext.ContextData;
_requestAborted = requestContext.RequestAborted;
_requestAborted = requestAbortedOverride.CanBeCanceled
? requestAbortedOverride
: requestContext.RequestAborted;
_operation = operation;
_variables = variables;
_services = scopedServices;
Expand Down
Loading
Loading