From 9941db981d80be5062ac41251eebc8c43f1798aa Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Sun, 19 Apr 2026 15:10:29 +0200 Subject: [PATCH 1/3] Rework concurrency gate for subscriptions --- .../EndpointRouteBuilderExtensions.cs | 3 - .../AspNetCore.Pipeline/MiddlewareFactory.cs | 31 - .../Options/GraphQLServerOptions.cs | 6 +- ...teAspNetCoreServiceCollectionExtensions.cs | 11 +- .../Execution/ExecutionConcurrencyGate.cs | 54 ++ .../WellKnownRequestMiddleware.cs | 3 + .../Execution.Pipeline/CommonMiddleware.cs | 7 + .../ConcurrencyGateMiddleware.cs | 54 ++ ...ternalSchemaServiceCollectionExtensions.cs | 3 +- ...estExecutorBuilderExtensions.UseRequest.cs | 23 + .../SubscriptionExecutor.Subscription.cs | 35 +- .../Processing/SubscriptionExecutor.cs | 8 +- .../ConcurrencyGateMiddlewareTests.cs | 622 ++++++++++++++++++ ...FusionServerServiceCollectionExtensions.cs | 13 +- ...FusionGatewayBuilderExtensions.Pipeline.cs | 11 + .../Execution/OperationPlanExecutor.cs | 19 +- .../Execution/Pipeline/FusionMiddleware.cs | 4 + .../src/docs/fusion/v16/performance-tuning.md | 20 +- .../v16/migrating/migrate-from-15-to-16.md | 16 + 19 files changed, 887 insertions(+), 56 deletions(-) create mode 100644 src/HotChocolate/Core/src/Execution.Abstractions/Execution/ExecutionConcurrencyGate.cs create mode 100644 src/HotChocolate/Core/src/Execution.Pipeline/ConcurrencyGateMiddleware.cs create mode 100644 src/HotChocolate/Core/test/Execution.Tests/Pipeline/ConcurrencyGateMiddlewareTests.cs diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/Extensions/EndpointRouteBuilderExtensions.cs b/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/Extensions/EndpointRouteBuilderExtensions.cs index 36c9138d52e..ab81e24d398 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/Extensions/EndpointRouteBuilderExtensions.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/Extensions/EndpointRouteBuilderExtensions.cs @@ -133,7 +133,6 @@ public static IApplicationBuilder MapGraphQL( applicationBuilder .Use(MiddlewareFactory.CreateCancellationMiddleware()) - .Use(MiddlewareFactory.CreateConcurrencyGateMiddleware(serverOptions.MaxConcurrentRequests)) .Use(MiddlewareFactory.CreateWebSocketSubscriptionMiddleware(executor, serverOptions)) .Use(MiddlewareFactory.CreateHttpPostMiddleware(executor, serverOptions)) .Use(MiddlewareFactory.CreateHttpMultipartMiddleware(executor, serverOptions, formOptions)) @@ -216,7 +215,6 @@ public static GraphQLHttpEndpointConventionBuilder MapGraphQLHttp( requestPipeline .Use(MiddlewareFactory.CreateCancellationMiddleware()) - .Use(MiddlewareFactory.CreateConcurrencyGateMiddleware(serverOptions.MaxConcurrentRequests)) .Use(MiddlewareFactory.CreateHttpPostMiddleware(executor, serverOptions)) .Use(MiddlewareFactory.CreateHttpMultipartMiddleware(executor, serverOptions, formOptions)) .Use(MiddlewareFactory.CreateHttpGetMiddleware(executor, serverOptions)) @@ -377,7 +375,6 @@ public static IEndpointConventionBuilder MapGraphQLSchema( requestPipeline .Use(MiddlewareFactory.CreateCancellationMiddleware()) - .Use(MiddlewareFactory.CreateConcurrencyGateMiddleware(serverOptions.MaxConcurrentRequests)) .Use(MiddlewareFactory.CreateHttpGetSchemaMiddleware( executor, serverOptions, PathString.Empty, MiddlewareRoutingType.Explicit)) .Use(_ => context => diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/MiddlewareFactory.cs b/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/MiddlewareFactory.cs index 62a1cb6bf59..7c23d6a3785 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/MiddlewareFactory.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/MiddlewareFactory.cs @@ -29,37 +29,6 @@ internal static Func CreateCancellationMiddlew }; } - internal static Func CreateConcurrencyGateMiddleware( - int? maxConcurrentRequests) - { - if (maxConcurrentRequests is null or <= 0) - { - return next => next; - } - - var semaphore = new SemaphoreSlim(maxConcurrentRequests.Value, maxConcurrentRequests.Value); - - return next => async context => - { - if (context.WebSockets.IsWebSocketRequest) - { - await next(context); - return; - } - - await semaphore.WaitAsync(context.RequestAborted); - - try - { - await next(context); - } - finally - { - semaphore.Release(); - } - }; - } - internal static Func CreateWebSocketSubscriptionMiddleware( HttpRequestExecutorProxy executor, GraphQLServerOptions serverOptions) diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/Options/GraphQLServerOptions.cs b/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/Options/GraphQLServerOptions.cs index 48754d152fe..c2c1b0705c6 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/Options/GraphQLServerOptions.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore.Pipeline/Options/GraphQLServerOptions.cs @@ -65,10 +65,10 @@ public sealed class GraphQLServerOptions public int MaxBatchSize { get; set; } = 1024; /// - /// Gets or sets the maximum number of concurrent GraphQL requests that can be + /// Gets or sets the maximum number of concurrent GraphQL executions that can be /// processed simultaneously. A value of null means unlimited. Defaults to 64. /// - public int? MaxConcurrentRequests { get; set; } = 64; + public int? MaxConcurrentExecutions { get; set; } = 64; internal GraphQLServerOptions Clone() => new() @@ -88,6 +88,6 @@ internal GraphQLServerOptions Clone() EnableSchemaRequests = EnableSchemaRequests, Batching = Batching, MaxBatchSize = MaxBatchSize, - MaxConcurrentRequests = MaxConcurrentRequests + MaxConcurrentExecutions = MaxConcurrentExecutions }; } diff --git a/src/HotChocolate/AspNetCore/src/AspNetCore/Extensions/HotChocolateAspNetCoreServiceCollectionExtensions.cs b/src/HotChocolate/AspNetCore/src/AspNetCore/Extensions/HotChocolateAspNetCoreServiceCollectionExtensions.cs index a2603de0425..0ab1d1daa07 100644 --- a/src/HotChocolate/AspNetCore/src/AspNetCore/Extensions/HotChocolateAspNetCoreServiceCollectionExtensions.cs +++ b/src/HotChocolate/AspNetCore/src/AspNetCore/Extensions/HotChocolateAspNetCoreServiceCollectionExtensions.cs @@ -121,7 +121,7 @@ private static IRequestExecutorBuilder AddGraphQLServerCore( this IRequestExecutorBuilder builder, int maxAllowedRequestSize = MaxAllowedRequestSize) { - builder.ConfigureSchemaServices(s => + builder.ConfigureSchemaServices((applicationServices, s) => { s.TryAddSingleton(sp => { @@ -154,6 +154,15 @@ private static IRequestExecutorBuilder AddGraphQLServerCore( _ => new AggregateServerDiagnosticEventListener(listeners) }; }); + + s.TryAddSingleton(schemaServices => + { + var schemaName = schemaServices.GetRequiredService().Name; + var serverOptions = applicationServices + .GetRequiredService>() + .Get(schemaName); + return new ExecutionConcurrencyGate(serverOptions.MaxConcurrentExecutions); + }); }); builder.Services.TryAddEnumerable( diff --git a/src/HotChocolate/Core/src/Execution.Abstractions/Execution/ExecutionConcurrencyGate.cs b/src/HotChocolate/Core/src/Execution.Abstractions/Execution/ExecutionConcurrencyGate.cs new file mode 100644 index 00000000000..0cf761a85fd --- /dev/null +++ b/src/HotChocolate/Core/src/Execution.Abstractions/Execution/ExecutionConcurrencyGate.cs @@ -0,0 +1,54 @@ +namespace HotChocolate.Execution; + +/// +/// Limits the number of GraphQL executions that can run simultaneously against a +/// single . The gate is shared by the request pipeline +/// (for queries, mutations, and subscription handshakes) and the subscription event +/// loop (for each inbound event), so that one in-flight execution equals one slot +/// regardless of transport. +/// +public sealed class ExecutionConcurrencyGate +{ + private readonly SemaphoreSlim? _semaphore; + + /// + /// Initializes a new instance of . + /// + /// + /// The maximum number of concurrent executions allowed. A value of null + /// or <= 0 disables the gate. + /// + public ExecutionConcurrencyGate(int? maxConcurrentExecutions) + { + if (maxConcurrentExecutions is { } max and > 0) + { + _semaphore = new SemaphoreSlim(max, max); + } + } + + /// + /// Gets a value indicating whether the gate is enabled. + /// + public bool IsEnabled => _semaphore is not null; + + /// + /// Asynchronously waits for a slot to become available. + /// + /// + /// The cancellation token to observe while waiting. + /// + public ValueTask WaitAsync(CancellationToken cancellationToken) + { + if (_semaphore is null) + { + return ValueTask.CompletedTask; + } + + return new ValueTask(_semaphore.WaitAsync(cancellationToken)); + } + + /// + /// Releases a previously acquired slot. + /// + public void Release() => _semaphore?.Release(); +} diff --git a/src/HotChocolate/Core/src/Execution.Abstractions/WellKnownRequestMiddleware.cs b/src/HotChocolate/Core/src/Execution.Abstractions/WellKnownRequestMiddleware.cs index a1d09659c32..0f401e94aaa 100644 --- a/src/HotChocolate/Core/src/Execution.Abstractions/WellKnownRequestMiddleware.cs +++ b/src/HotChocolate/Core/src/Execution.Abstractions/WellKnownRequestMiddleware.cs @@ -26,6 +26,9 @@ public static class WellKnownRequestMiddleware /// Gets the key for the OperationCacheMiddleware. public const string OperationCacheMiddleware = "OperationCacheMiddleware"; + /// Gets the key for the ConcurrencyGateMiddleware. + public const string ConcurrencyGateMiddleware = "ConcurrencyGateMiddleware"; + /// Gets the key for the OperationExecutionMiddleware. public const string OperationExecutionMiddleware = "OperationExecutionMiddleware"; diff --git a/src/HotChocolate/Core/src/Execution.Pipeline/CommonMiddleware.cs b/src/HotChocolate/Core/src/Execution.Pipeline/CommonMiddleware.cs index 0ce8663a05f..c2520371dd3 100644 --- a/src/HotChocolate/Core/src/Execution.Pipeline/CommonMiddleware.cs +++ b/src/HotChocolate/Core/src/Execution.Pipeline/CommonMiddleware.cs @@ -41,4 +41,11 @@ public static RequestMiddlewareConfiguration DocumentValidation /// public static RequestMiddlewareConfiguration SkipWarmupExecution => SkipWarmupExecutionMiddleware.Create(); + + /// + /// Gets the middleware configuration for limiting the number of concurrent + /// GraphQL executions against the current schema. + /// + public static RequestMiddlewareConfiguration ConcurrencyGate + => ConcurrencyGateMiddleware.Create(); } diff --git a/src/HotChocolate/Core/src/Execution.Pipeline/ConcurrencyGateMiddleware.cs b/src/HotChocolate/Core/src/Execution.Pipeline/ConcurrencyGateMiddleware.cs new file mode 100644 index 00000000000..dae07c7f13b --- /dev/null +++ b/src/HotChocolate/Core/src/Execution.Pipeline/ConcurrencyGateMiddleware.cs @@ -0,0 +1,54 @@ +using Microsoft.Extensions.DependencyInjection; + +namespace HotChocolate.Execution.Pipeline; + +internal sealed class ConcurrencyGateMiddleware +{ + private readonly RequestDelegate _next; + private readonly ExecutionConcurrencyGate _gate; + + private ConcurrencyGateMiddleware( + RequestDelegate next, + ExecutionConcurrencyGate gate) + { + ArgumentNullException.ThrowIfNull(next); + ArgumentNullException.ThrowIfNull(gate); + + _next = next; + _gate = gate; + } + + public async ValueTask InvokeAsync(RequestContext context) + { + // Delegates to the schema-wide execution concurrency gate; see + // ExecutionConcurrencyGate for ordering and timeout semantics. + await _gate.WaitAsync(context.RequestAborted).ConfigureAwait(false); + + try + { + await _next(context).ConfigureAwait(false); + } + finally + { + _gate.Release(); + } + } + + public static RequestMiddlewareConfiguration Create() + => new RequestMiddlewareConfiguration( + (factoryContext, next) => + { + var gate = factoryContext.SchemaServices.GetService(); + + if (gate is null or { IsEnabled: false }) + { + // No gate is configured — skip the middleware entirely so there is + // no per-request overhead on the hot path. + return context => next(context); + } + + var middleware = new ConcurrencyGateMiddleware(next, gate); + return context => middleware.InvokeAsync(context); + }, + WellKnownRequestMiddleware.ConcurrencyGateMiddleware); +} diff --git a/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/InternalSchemaServiceCollectionExtensions.cs b/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/InternalSchemaServiceCollectionExtensions.cs index d6716c39838..a89073f3cf7 100644 --- a/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/InternalSchemaServiceCollectionExtensions.cs +++ b/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/InternalSchemaServiceCollectionExtensions.cs @@ -21,7 +21,8 @@ internal static IServiceCollection TryAddOperationExecutors( sp.GetRootServiceProvider().GetRequiredService>(), sp.GetRequiredService(), sp.GetRequiredService(), - sp.GetRequiredService())); + sp.GetRequiredService(), + sp.GetService())); return services; } diff --git a/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/RequestExecutorBuilderExtensions.UseRequest.cs b/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/RequestExecutorBuilderExtensions.UseRequest.cs index 6647072a5e6..c2a54fbe032 100644 --- a/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/RequestExecutorBuilderExtensions.UseRequest.cs +++ b/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/RequestExecutorBuilderExtensions.UseRequest.cs @@ -475,6 +475,26 @@ public static IRequestExecutorBuilder UseOperationCache( return builder.UseRequest(OperationCacheMiddleware.Create()); } + /// + /// Adds a middleware that will be used to limit the number of concurrent + /// GraphQL executions against the current schema. The gate counts every + /// execution (queries, mutations, subscription handshakes, and per-event + /// subscription executions) as a single slot. + /// + /// + /// The that can be used to configure a schema and its execution. + /// + /// + /// An that can be used to configure a schema and its execution. + /// + public static IRequestExecutorBuilder UseConcurrencyGate( + this IRequestExecutorBuilder builder) + { + ArgumentNullException.ThrowIfNull(builder); + + return builder.UseRequest(CommonMiddleware.ConcurrencyGate); + } + /// /// Adds a middleware that will be used to execute the operation. /// @@ -622,6 +642,7 @@ public static IRequestExecutorBuilder UsePersistedOperationPipeline( .UseOperationResolver() .UseSkipWarmupExecution() .UseOperationVariableCoercion() + .UseConcurrencyGate() .UseOperationExecution(); } @@ -644,6 +665,7 @@ public static IRequestExecutorBuilder UseAutomaticPersistedOperationPipeline( .UseOperationResolver() .UseSkipWarmupExecution() .UseOperationVariableCoercion() + .UseConcurrencyGate() .UseOperationExecution(); } @@ -659,6 +681,7 @@ internal static void AddDefaultPipeline(this IList resolveQueryRootValue, - IExecutionDiagnosticEvents diagnosticEvents) + IExecutionDiagnosticEvents diagnosticEvents, + ExecutionConcurrencyGate? concurrencyGate) { unchecked { @@ -48,6 +50,7 @@ private Subscription( _rootSelections = rootSelections; _resolveQueryRootValue = resolveQueryRootValue; _diagnosticEvents = diagnosticEvents; + _concurrencyGate = concurrencyGate; _errorHandler = _requestContext.Schema.Services.GetRequiredService(); } @@ -76,6 +79,11 @@ private Subscription( /// /// The internal diagnostic events to report telemetry. /// + /// + /// The per-schema concurrency gate that bounds the number of in-flight + /// executions. Each event acquires a slot before invoking the query + /// executor, mirroring the pipeline gate for queries and mutations. + /// /// /// Returns a new subscription instance. /// @@ -86,7 +94,8 @@ public static async ValueTask SubscribeAsync( ObjectType subscriptionType, SelectionSet rootSelections, Func resolveQueryRootValue, - IExecutionDiagnosticEvents diagnosticsEvents) + IExecutionDiagnosticEvents diagnosticsEvents, + ExecutionConcurrencyGate? concurrencyGate) { var subscription = new Subscription( operationContextPool, @@ -95,7 +104,8 @@ public static async ValueTask SubscribeAsync( subscriptionType, rootSelections, resolveQueryRootValue, - diagnosticsEvents); + diagnosticsEvents, + concurrencyGate); subscription._subscriptionScope = diagnosticsEvents.ExecuteSubscription(requestContext, subscription.Id); subscription._sourceStream = await subscription.SubscribeAsync().ConfigureAwait(false); @@ -154,10 +164,22 @@ private async Task OnEvent(object payload) serviceScope.ServiceProvider.InitializeDataLoaderScope(); - var operationContext = _operationContextPool.Get(); + // Per-event concurrency gate acquire. The slot is released in the finally + // block below so it is returned even if the query executor throws or the + // subscription is cancelled mid-execution. + var gateAcquired = false; + OperationContext? operationContext = null; try { + if (_concurrencyGate is { IsEnabled: true }) + { + await _concurrencyGate.WaitAsync(_requestContext.RequestAborted).ConfigureAwait(false); + gateAcquired = true; + } + + operationContext = _operationContextPool.Get(); + var eventServices = serviceScope.ServiceProvider; var dispatcher = eventServices.GetRequiredService(); @@ -212,6 +234,11 @@ private async Task OnEvent(object payload) { _operationContextPool.Return(operationContext); } + + if (gateAcquired) + { + _concurrencyGate!.Release(); + } } } diff --git a/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.cs b/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.cs index 88ea444de11..9d7d24b46d5 100644 --- a/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.cs +++ b/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.cs @@ -11,17 +11,20 @@ internal sealed partial class SubscriptionExecutor private readonly QueryExecutor _queryExecutor; private readonly IErrorHandler _errorHandler; private readonly IExecutionDiagnosticEvents _diagnosticEvents; + private readonly ExecutionConcurrencyGate? _concurrencyGate; public SubscriptionExecutor( ObjectPool operationContextPool, QueryExecutor queryExecutor, IErrorHandler errorHandler, - IExecutionDiagnosticEvents diagnosticEvents) + IExecutionDiagnosticEvents diagnosticEvents, + ExecutionConcurrencyGate? concurrencyGate = null) { _operationContextPool = operationContextPool; _queryExecutor = queryExecutor; _errorHandler = errorHandler; _diagnosticEvents = diagnosticEvents; + _concurrencyGate = concurrencyGate; } public async Task ExecuteAsync( @@ -58,7 +61,8 @@ public async Task ExecuteAsync( operation.RootType, selectionSet, resolveQueryValue, - _diagnosticEvents) + _diagnosticEvents, + _concurrencyGate) .ConfigureAwait(false); var response = new ResponseStream(() => subscription.ExecuteAsync()); diff --git a/src/HotChocolate/Core/test/Execution.Tests/Pipeline/ConcurrencyGateMiddlewareTests.cs b/src/HotChocolate/Core/test/Execution.Tests/Pipeline/ConcurrencyGateMiddlewareTests.cs new file mode 100644 index 00000000000..c37d994c7ed --- /dev/null +++ b/src/HotChocolate/Core/test/Execution.Tests/Pipeline/ConcurrencyGateMiddlewareTests.cs @@ -0,0 +1,622 @@ +using HotChocolate.Subscriptions; +using HotChocolate.Types; +using Microsoft.Extensions.DependencyInjection; + +namespace HotChocolate.Execution.Pipeline; + +public sealed class ConcurrencyGateMiddlewareTests +{ + [Fact] + public async Task Gate_Should_Be_Held_Inside_Operation_Execution_But_Not_Outside_When_Query_Is_Executed() + { + // arrange + // N=1 lets us observe slot state via a non-blocking probe acquire. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + ExecutionConcurrencyGate gate = null!; + var beforeHeld = false; + var insideHeld = false; + var afterHeld = false; + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddQueryType(d => d.Field("foo").Resolve("bar")) + .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: 1))) + .UseDefaultPipeline() + .UseRequest( + (_, next) => async context => + { + beforeHeld = await IsSlotHeldAsync(gate); + await next(context); + afterHeld = await IsSlotHeldAsync(gate); + }, + key: "Outside", + before: WellKnownRequestMiddleware.ConcurrencyGateMiddleware, + allowMultiple: true) + .UseRequest( + (_, next) => async context => + { + insideHeld = await IsSlotHeldAsync(gate); + await next(context); + }, + key: "Inside", + before: WellKnownRequestMiddleware.OperationExecutionMiddleware, + allowMultiple: true) + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + gate = executor.Schema.Services.GetRequiredService(); + + // act + var result = await executor.ExecuteAsync("{ foo }", cts.Token); + + // assert + Assert.Empty(result.ExpectOperationResult().Errors); + Assert.False(beforeHeld); + Assert.True(insideHeld); + Assert.False(afterHeld); + } + + [Fact] + public async Task Gate_Should_Be_Held_Inside_Operation_Execution_When_Mutation_Is_Executed() + { + // arrange + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + ExecutionConcurrencyGate gate = null!; + var beforeHeld = false; + var insideHeld = false; + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddQueryType(d => d.Field("foo").Resolve("bar")) + .AddMutationType(d => d.Field("setFoo").Resolve("baz")) + .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: 1))) + .UseDefaultPipeline() + .UseRequest( + (_, next) => async context => + { + beforeHeld = await IsSlotHeldAsync(gate); + await next(context); + }, + key: "Outside", + before: WellKnownRequestMiddleware.ConcurrencyGateMiddleware, + allowMultiple: true) + .UseRequest( + (_, next) => async context => + { + insideHeld = await IsSlotHeldAsync(gate); + await next(context); + }, + key: "Inside", + before: WellKnownRequestMiddleware.OperationExecutionMiddleware, + allowMultiple: true) + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + gate = executor.Schema.Services.GetRequiredService(); + + // act + var result = await executor.ExecuteAsync("mutation { setFoo }", cts.Token); + + // assert + Assert.Empty(result.ExpectOperationResult().Errors); + Assert.False(beforeHeld); + Assert.True(insideHeld); + } + + [Fact] + public async Task Gate_Should_Be_Released_Between_Subscription_Handshake_And_Iteration() + { + // arrange + // N=1; subscribe, then submit a query while events are (potentially) flowing. + // The handshake must have released the slot by the time the IResponseStream is returned. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddInMemorySubscriptions() + .AddQueryType(d => d.Field("foo").Resolve("bar")) + .AddSubscriptionType() + .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: 1))) + .UseDefaultPipeline() + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + var gate = executor.Schema.Services.GetRequiredService(); + + // act + // subscribe; this goes through the pipeline including the gate + var subscribe = await executor.ExecuteAsync("subscription { onMessage }", cts.Token); + await using var stream = subscribe.ExpectResponseStream(); + + // a subsequent query must be able to acquire the slot (i.e. handshake released it) + var queryResult = await executor.ExecuteAsync("{ foo }", cts.Token); + + // assert + Assert.Empty(queryResult.ExpectOperationResult().Errors); + Assert.False(await IsSlotHeldAsync(gate)); + } + + [Fact] + public async Task Query_Should_Wait_For_Available_Slot_When_Gate_Is_Saturated() + { + // arrange + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var first = new AsyncLatch(); + var second = new AsyncLatch(); + var third = new AsyncLatch(); + + var counter = 0; + var latches = new[] { first, second }; + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddQueryType(d => + { + d.Field("block").Resolve(async _ => + { + var index = Interlocked.Increment(ref counter) - 1; + if (index < latches.Length) + { + await latches[index].WaitAsync(cts.Token); + } + + return "ok"; + }); + d.Field("instant").Resolve("ok"); + }) + .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: 2))) + .UseDefaultPipeline() + .UseRequest( + (_, next) => async context => + { + if (context.Request.OperationName == "Third") + { + third.Signal(); + } + + await next(context); + }, + key: "ThirdProbe", + before: WellKnownRequestMiddleware.ConcurrencyGateMiddleware, + allowMultiple: true) + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + // act + // saturate with two slow queries + var firstTask = Task.Run( + () => executor.ExecuteAsync("query A { block }", cts.Token), + CancellationToken.None); + var secondTask = Task.Run( + () => executor.ExecuteAsync("query B { block }", cts.Token), + CancellationToken.None); + + var gate = executor.Schema.Services.GetRequiredService(); + await WaitUntilAsync(async () => await IsSlotHeldAsync(gate), cts.Token); + + var thirdTask = Task.Run( + () => executor.ExecuteAsync("query Third { instant }", cts.Token), + CancellationToken.None); + + // third has entered the pipeline but is still waiting on the gate + await third.WaitAsync(cts.Token); + await Task.Delay(100, cts.Token); + Assert.False(thirdTask.IsCompleted); + + // release one in-flight query — the third can now acquire a slot + first.Signal(); + + var thirdResult = await thirdTask; + Assert.Empty(thirdResult.ExpectOperationResult().Errors); + + // drain the other in-flight query + second.Signal(); + + var results = await Task.WhenAll(firstTask, secondTask); + Assert.All(results, r => Assert.Empty(r.ExpectOperationResult().Errors)); + } + + [Fact] + public async Task Slot_Should_Be_Released_When_Resolver_Throws() + { + // arrange + // N=1; a resolver throw must not leak the slot. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddQueryType(d => + { + d.Field("throw").Resolve(_ => throw new InvalidOperationException("boom")); + d.Field("ok").Resolve("ok"); + }) + .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: 1))) + .UseDefaultPipeline() + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + var gate = executor.Schema.Services.GetRequiredService(); + + // act + var throwing = await executor.ExecuteAsync("{ throw }", cts.Token); + var throwingResult = throwing.ExpectOperationResult(); + + // assert + Assert.NotEmpty(throwingResult.Errors); + Assert.False(await IsSlotHeldAsync(gate)); + + // slot is still usable — subsequent queries proceed without blocking + var ok = await executor.ExecuteAsync("{ ok }", cts.Token); + Assert.Empty(ok.ExpectOperationResult().Errors); + Assert.False(await IsSlotHeldAsync(gate)); + } + + [Fact] + public async Task Slot_Should_Be_Released_When_Pipeline_Middleware_Throws_Inside_Gate() + { + // arrange + // N=1; force an exception from a middleware sitting inside the gate + // and verify the slot is returned by the gate's finally block. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddQueryType(d => d.Field("ok").Resolve("ok")) + .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: 1))) + .UseDefaultPipeline() + .UseRequest( + (_, _) => _ => throw new InvalidOperationException("pipeline failure"), + key: "Throwing", + before: WellKnownRequestMiddleware.OperationExecutionMiddleware, + allowMultiple: true) + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + var gate = executor.Schema.Services.GetRequiredService(); + + // act + var result = await executor.ExecuteAsync("{ ok }", cts.Token); + + // assert + Assert.NotEmpty(result.ExpectOperationResult().Errors); + Assert.False(await IsSlotHeldAsync(gate)); + } + + [Fact] + public async Task Query_Should_Fail_With_Timeout_Error_When_Wait_Exceeds_ExecutionTimeout() + { + // arrange + // N=1, ExecutionTimeout = 200ms. Saturate the gate with a slow query + // and submit a second that must time out while waiting for the slot. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var hold = new AsyncLatch(); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddQueryType(d => + { + d.Field("block").Resolve(async _ => + { + await hold.WaitAsync(cts.Token); + return "ok"; + }); + d.Field("instant").Resolve("ok"); + }) + .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: 1))) + .ModifyRequestOptions(o => o.ExecutionTimeout = TimeSpan.FromMilliseconds(200)) + .UseDefaultPipeline() + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + var gate = executor.Schema.Services.GetRequiredService(); + + // saturate the gate + var blocking = Task.Run( + () => executor.ExecuteAsync("{ block }", cts.Token), + CancellationToken.None); + await WaitUntilAsync(async () => await IsSlotHeldAsync(gate), cts.Token); + + // act + // second request must time out waiting on the gate + var timedOut = await executor.ExecuteAsync("{ instant }", cts.Token); + + // assert + // clean timeout error, and the slot is still held by the in-flight query (not leaked) + var timedOutResult = timedOut.ExpectOperationResult(); + Assert.Collection( + timedOutResult.Errors, + e => Assert.Equal(ErrorCodes.Execution.Timeout, e.Code)); + Assert.True(await IsSlotHeldAsync(gate)); + + // release the blocking query; its slot returns and the gate is empty again + hold.Signal(); + Assert.Empty((await blocking).ExpectOperationResult().Errors); + Assert.False(await IsSlotHeldAsync(gate)); + } + + [Fact] + public async Task Gate_Should_Be_Disabled_When_MaxConcurrentExecutions_Is_Null() + { + // arrange + // null limit disables the gate entirely; middleware is a pass-through. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddQueryType(d => d.Field("foo").Resolve("bar")) + .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: null))) + .UseDefaultPipeline() + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + var gate = executor.Schema.Services.GetRequiredService(); + + // act + var result = await executor.ExecuteAsync("{ foo }", cts.Token); + + // assert + Assert.False(gate.IsEnabled); + Assert.Empty(result.ExpectOperationResult().Errors); + } + + [Fact] + public async Task Gate_Should_Be_Disabled_When_MaxConcurrentExecutions_Is_Zero() + { + // arrange + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddQueryType(d => d.Field("foo").Resolve("bar")) + .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: 0))) + .UseDefaultPipeline() + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + var gate = executor.Schema.Services.GetRequiredService(); + + // act + var result = await executor.ExecuteAsync("{ foo }", cts.Token); + + // assert + Assert.False(gate.IsEnabled); + Assert.Empty(result.ExpectOperationResult().Errors); + } + + [Fact] + public async Task Subscription_Event_Should_Wait_For_Slot_When_Query_Is_In_Flight() + { + // arrange + // N=1. Subscribe first (handshake releases its slot), then saturate the gate + // with a blocking query. Send an event — it must not be processed until the query completes. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var queryGate = new AsyncLatch(); + var completionOrder = new List(); + var orderLock = new object(); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddInMemorySubscriptions() + .AddQueryType(d => + { + d.Field("block").Resolve(async _ => + { + await queryGate.WaitAsync(cts.Token); + lock (orderLock) + { + completionOrder.Add("query"); + } + + return "ok"; + }); + }) + .AddSubscriptionType() + .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: 1))) + .UseDefaultPipeline() + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + var gate = executor.Schema.Services.GetRequiredService(); + var sender = executor.Schema.Services.GetRequiredService(); + + // subscribe + var subscribe = await executor.ExecuteAsync("subscription { onMessage }", cts.Token); + await using var stream = subscribe.ExpectResponseStream(); + var enumerator = stream.ReadResultsAsync().GetAsyncEnumerator(cts.Token); + + // saturate the slot with the blocking query + var blocking = Task.Run( + () => executor.ExecuteAsync("{ block }", cts.Token), + CancellationToken.None); + await WaitUntilAsync(async () => await IsSlotHeldAsync(gate), cts.Token); + + // act + // send an event; its execution must queue behind the query's slot + await sender.SendAsync("OnMessage", "one", cts.Token); + + var moveNext = enumerator.MoveNextAsync().AsTask(); + var delay = Task.Delay(150, cts.Token); + var won = await Task.WhenAny(moveNext, delay); + Assert.Same(delay, won); + + // release the blocking query — its slot is freed, the event executes, and the stream yields + queryGate.Signal(); + Assert.True(await moveNext); + lock (orderLock) + { + completionOrder.Add("event"); + } + + // assert + // query finished before the event could run + await blocking; + Assert.Equal(["query", "event"], completionOrder); + Assert.False(await IsSlotHeldAsync(gate)); + + await enumerator.DisposeAsync(); + } + + [Fact] + public async Task Subscription_Stream_Should_Not_Be_Drained_Ahead_Of_In_Flight_Execution() + { + // arrange + // fast source; one event held in execution. Assert that the source has yielded + // at most one event ahead of the currently processing event (pull-based back-pressure). + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + var yielded = 0; + var processed = 0; + var processingGate = new AsyncLatch(); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddInMemorySubscriptions() + .AddQueryType(d => d.Field("foo").Resolve("bar")) + .AddSubscriptionType() + .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: 10))) + .UseDefaultPipeline() + .Services + .AddSingleton(new CountingSource( + () => Interlocked.Increment(ref yielded), + async ct => + { + Interlocked.Increment(ref processed); + await processingGate.WaitAsync(ct); + })) + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + // act + var subscribe = await executor.ExecuteAsync("subscription { onTick }", cts.Token); + await using var stream = subscribe.ExpectResponseStream(); + var enumerator = stream.ReadResultsAsync().GetAsyncEnumerator(cts.Token); + var moveNext = enumerator.MoveNextAsync().AsTask(); + + await WaitUntilAsync(() => Volatile.Read(ref processed) >= 1, cts.Token); + await Task.Delay(100, cts.Token); + + // assert + // yielded <= processed + 1 (at most one event is pulled ahead of the in-flight execution) + var yieldedSnapshot = Volatile.Read(ref yielded); + var processedSnapshot = Volatile.Read(ref processed); + Assert.Equal(1, processedSnapshot); + Assert.True( + yieldedSnapshot <= processedSnapshot + 1, + $"Expected yielded <= processed + 1, but yielded={yieldedSnapshot} and processed={processedSnapshot}."); + + // unblock and observe the event surfacing + processingGate.Signal(); + Assert.True(await moveNext); + + await enumerator.DisposeAsync(); + } + + /// + /// Behavioral probe: try to enter the gate with a zero timeout. If the gate is fully held, + /// the wait returns false; otherwise it acquires a slot we must return immediately. + /// + private static async Task IsSlotHeldAsync(ExecutionConcurrencyGate gate) + { + if (!gate.IsEnabled) + { + return false; + } + + var semaphore = GetSemaphore(gate); + if (await semaphore.WaitAsync(0).ConfigureAwait(false)) + { + semaphore.Release(); + // We got a slot — so at least one was free, which means not *all* slots are held. + // For N=1 this means the gate is free; for N>1 this just means it is not saturated. + return false; + } + + return true; + } + + private static SemaphoreSlim GetSemaphore(ExecutionConcurrencyGate gate) + { + // The gate exposes WaitAsync/Release but does not expose the underlying primitive. + // Tests need a non-blocking probe; the least invasive option is to read the private field. + var field = typeof(ExecutionConcurrencyGate).GetField( + "_semaphore", + System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic); + return (SemaphoreSlim)field!.GetValue(gate)!; + } + + private static async Task WaitUntilAsync(Func predicate, CancellationToken cancellationToken) + { + while (!predicate()) + { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Delay(10, cancellationToken).ConfigureAwait(false); + } + } + + private static async Task WaitUntilAsync(Func> predicate, CancellationToken cancellationToken) + { + while (!await predicate().ConfigureAwait(false)) + { + cancellationToken.ThrowIfCancellationRequested(); + await Task.Delay(10, cancellationToken).ConfigureAwait(false); + } + } + + public sealed class StringSubscription + { + [Subscribe] + public string OnMessage([EventMessage] string message) => message; + } + + public sealed class CountingSubscription + { + public async IAsyncEnumerable SubscribeToTicks( + [Service] CountingSource source, + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var counter = 0; + + while (!cancellationToken.IsCancellationRequested) + { + source.OnYield(); + yield return ++counter; + } + } + + [Subscribe(With = nameof(SubscribeToTicks))] + public async Task OnTick( + [EventMessage] int tick, + [Service] CountingSource source, + CancellationToken cancellationToken) + { + await source.OnProcess(cancellationToken); + return tick; + } + } + + public sealed class CountingSource(Action onYield, Func onProcess) + { + public void OnYield() => onYield(); + + public Task OnProcess(CancellationToken ct) => onProcess(ct); + } + + private sealed class AsyncLatch + { + private readonly SemaphoreSlim _semaphore = new(0); + + public void Signal() => _semaphore.Release(); + + public Task WaitAsync(CancellationToken cancellationToken) + => _semaphore.WaitAsync(cancellationToken); + } +} diff --git a/src/HotChocolate/Fusion/src/Fusion.AspNetCore/DependencyInjection/FusionServerServiceCollectionExtensions.cs b/src/HotChocolate/Fusion/src/Fusion.AspNetCore/DependencyInjection/FusionServerServiceCollectionExtensions.cs index 0533795b30f..4614eb51e21 100644 --- a/src/HotChocolate/Fusion/src/Fusion.AspNetCore/DependencyInjection/FusionServerServiceCollectionExtensions.cs +++ b/src/HotChocolate/Fusion/src/Fusion.AspNetCore/DependencyInjection/FusionServerServiceCollectionExtensions.cs @@ -1,3 +1,4 @@ +using HotChocolate; using HotChocolate.AspNetCore; using HotChocolate.AspNetCore.Formatters; using HotChocolate.AspNetCore.Instrumentation; @@ -10,6 +11,7 @@ using HotChocolate.Fusion.Configuration; using HotChocolate.Language; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; namespace Microsoft.Extensions.DependencyInjection; @@ -35,7 +37,7 @@ private static IFusionGatewayBuilder AddGraphQLGatewayServerCore( this IFusionGatewayBuilder builder, int maxAllowedRequestSize = ServerDefaults.MaxAllowedRequestSize) { - builder.ConfigureSchemaServices((_, sc) => + builder.ConfigureSchemaServices((applicationServices, sc) => { sc.TryAddSingleton(); @@ -62,6 +64,15 @@ private static IFusionGatewayBuilder AddGraphQLGatewayServerCore( _ => new AggregateServerDiagnosticEventListener(listeners) }; }); + + sc.TryAddSingleton(schemaServices => + { + var schemaName = schemaServices.GetRequiredService().Name; + var serverOptions = applicationServices + .GetRequiredService>() + .Get(schemaName); + return new ExecutionConcurrencyGate(serverOptions.MaxConcurrentExecutions); + }); }); return builder; diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/CoreFusionGatewayBuilderExtensions.Pipeline.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/CoreFusionGatewayBuilderExtensions.Pipeline.cs index 6975321b7ff..369ea809fd7 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/CoreFusionGatewayBuilderExtensions.Pipeline.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/DependencyInjection/CoreFusionGatewayBuilderExtensions.Pipeline.cs @@ -71,6 +71,14 @@ public static IFusionGatewayBuilder UseOperationPlan( return builder.UseRequest(FusionMiddleware.OperationPlan); } + public static IFusionGatewayBuilder UseConcurrencyGate( + this IFusionGatewayBuilder builder) + { + ArgumentNullException.ThrowIfNull(builder); + + return builder.UseRequest(FusionMiddleware.ConcurrencyGate); + } + public static IFusionGatewayBuilder UseOperationExecution( this IFusionGatewayBuilder builder) { @@ -178,6 +186,7 @@ public static IFusionGatewayBuilder UseDefaultPipeline( .UseOperationPlan() .UseSkipWarmupExecution() .UseOperationVariableCoercion() + .UseConcurrencyGate() .UseOperationExecution(); } @@ -202,6 +211,7 @@ public static IFusionGatewayBuilder UsePersistedOperationPipeline( .UseOperationPlan() .UseSkipWarmupExecution() .UseOperationVariableCoercion() + .UseConcurrencyGate() .UseOperationExecution(); } @@ -226,6 +236,7 @@ public static IFusionGatewayBuilder UseAutomaticPersistedOperationPipeline( .UseOperationPlan() .UseSkipWarmupExecution() .UseOperationVariableCoercion() + .UseConcurrencyGate() .UseOperationExecution(); } } diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/OperationPlanExecutor.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/OperationPlanExecutor.cs index cfbb4fbdf79..65c65b7516d 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/OperationPlanExecutor.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/OperationPlanExecutor.cs @@ -85,10 +85,11 @@ public async Task SubscribeAsync( throw new InvalidOperationException("We could not subscribe to the underlying source schema."); } - var subscriptionEnumerable = CreateSubscriptionEnumerable( + var subscriptionEnumerable = CreateResponseStream( context, subscriptionNode, subscriptionResult, + requestContext.Schema.Services.GetService(), executionCts.Token, cancellationToken); @@ -220,10 +221,11 @@ private static async Task ExecuteMutationAsync( } } - private static async IAsyncEnumerable CreateSubscriptionEnumerable( + private static async IAsyncEnumerable CreateResponseStream( OperationPlanContext context, OperationExecutionNode subscriptionNode, SubscriptionResult subscriptionResult, + ExecutionConcurrencyGate? concurrencyGate, [EnumeratorCancellation] CancellationToken executionCancellationToken, CancellationToken requestCancellationToken) { @@ -247,8 +249,16 @@ private static async IAsyncEnumerable CreateSubscriptionEnumera OperationResult result; + var gateAcquired = false; + try { + if (concurrencyGate is { IsEnabled: true }) + { + await concurrencyGate.WaitAsync(requestCancellationToken).ConfigureAwait(false); + gateAcquired = true; + } + context.Begin(eventArgs.StartTimestamp, eventArgs.Activity?.TraceId.ToHexString()); executionState.Reset(); @@ -306,6 +316,11 @@ private static async IAsyncEnumerable CreateSubscriptionEnumera { // disposing the eventArgs disposes the telemetry scope. eventArgs.Dispose(); + + if (gateAcquired) + { + concurrencyGate!.Release(); + } } yield return result; diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Pipeline/FusionMiddleware.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Pipeline/FusionMiddleware.cs index d42647fec63..687c632aa20 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Pipeline/FusionMiddleware.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/Pipeline/FusionMiddleware.cs @@ -1,4 +1,5 @@ using HotChocolate.Execution; +using HotChocolate.Execution.Pipeline; namespace HotChocolate.Fusion.Execution.Pipeline; @@ -21,4 +22,7 @@ public static RequestMiddlewareConfiguration OperationVariableCoercion public static RequestMiddlewareConfiguration Timeout => TimeoutMiddleware.Create(); + + public static RequestMiddlewareConfiguration ConcurrencyGate + => CommonMiddleware.ConcurrencyGate; } diff --git a/website/src/docs/fusion/v16/performance-tuning.md b/website/src/docs/fusion/v16/performance-tuning.md index 4c45908330b..66ca9b2d266 100644 --- a/website/src/docs/fusion/v16/performance-tuning.md +++ b/website/src/docs/fusion/v16/performance-tuning.md @@ -146,11 +146,13 @@ Only **query operations** are deduplicated. The following are **not** deduplicat - **Subscriptions**: long-lived connections that are inherently unique. - **File uploads**: multipart requests bypass deduplication. -## Concurrent Request Limiting +## Concurrent Execution Limiting -The gateway limits the number of simultaneous HTTP requests it processes using a **concurrency gate**. Capping concurrency keeps the gateway operating in its optimal throughput range. Too many requests competing for the same resources (thread pool, memory, connections) can reduce overall throughput rather than increase it. +The gateway limits the number of simultaneous **executions** it processes using a **concurrency gate**. An execution is the work of running a single GraphQL operation end-to-end. Each query or mutation counts as one execution, and each event a subscription emits counts as one execution while its selection set runs. Capping concurrency keeps the gateway operating in its optimal throughput range. Too much work competing for the same resources (thread pool, memory, connections) can reduce overall throughput rather than increase it. -The default limit is **64** concurrent HTTP requests. WebSocket and subscription requests bypass this limit. Depending on how many CPU cores your system has, you may want to increase or decrease this value to find the optimal throughput for your hardware. This limit does not reject requests. Instead, it queues them, and the GraphQL executor processes at most 64 concurrent requests by default. +The default limit is **64 concurrent executions**. The default is calibrated for small containers 1 to 4 CPUs. Depending on your CPU count and typical operation cost, you may want to increase or decrease this value to find the optimal throughput for your hardware. The limit does not reject work; it queues it, and the GraphQL executor processes at most 64 executions concurrently by default. + +Subscriptions participate in this limit like any other operation. Each event the gateway processes consumes a slot. Idle subscriptions between events cost nothing. Set the limit through `ModifyServerOptions` on the gateway builder: @@ -160,7 +162,7 @@ builder .AddFileSystemConfiguration("./gateway.far") .ModifyServerOptions(options => { - options.MaxConcurrentRequests = 128; + options.MaxConcurrentExecutions = 128; }); ``` @@ -170,16 +172,18 @@ You can override this limit for a specific HTTP endpoint using `WithOptions`: app.MapGraphQLHttp() .WithOptions(options => { - options.MaxConcurrentRequests = 256; + options.MaxConcurrentExecutions = 256; }); ``` ### Tuning Guidance -- **Too low**: requests queue behind the concurrency gate, adding latency even when gateway and subgraphs have spare capacity. -- **Too high**: the gateway forwards more requests than it can efficiently process, leading to thread pool starvation and increased latency. +- **Too low**: executions queue behind the concurrency gate, adding latency even when the gateway and subgraphs have spare capacity. Subscriptions with high event rates feel this first. +- **Too high**: the gateway runs more work than it can efficiently process, leading to thread pool starvation and increased latency across queries, mutations, and subscription events. + +Start with the default of 64 and adjust based on your workload. If you expect many long-lived subscriptions firing frequent events, factor those into your sizing. They now contend for the same slots as queries and mutations. Set to `null` to disable the limit entirely. -Start with the default of 64 and adjust based on your workload. Set to `null` to disable the limit entirely. +Queued executions wait on the gate; the `ExecutionTimeout` setting (default 30 seconds) bounds how long they can wait before the request is cancelled with a clean timeout error. If you regularly see executions timing out at the gate, that's a signal to scale out or raise the limit rather than extend the timeout. ## Next Steps diff --git a/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md b/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md index 67aa21dced0..5c5c57ae463 100644 --- a/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md +++ b/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md @@ -1083,3 +1083,19 @@ builder .AddGraphQL() .SetMaxAllowedFieldMergeComparisons(200_000); ``` + +## Concurrent execution gate + +Hot Chocolate v16 introduces a concurrency gate that limits how many GraphQL operations execute at the same time. The gate sits in the request pipeline just before operation execution and applies uniformly to queries, mutations, subscription handshakes, and each subscription event. + +Configure the limit through `ModifyServerOptions`: + +```csharp +builder + .AddGraphQL() + .ModifyServerOptions(o => o.MaxConcurrentExecutions = 128); +``` + +The default is **64**. Operations that arrive while the gate is full queue up and run as slots free. The `ExecutionTimeout` setting (default 30 seconds) bounds how long an operation can wait before it is cancelled with a clean timeout error. Set the limit to `null` to disable the gate entirely. + +Subscriptions participate in the limit like any other operation. The initial subscribe consumes a slot while the subscribe resolver runs, and each emitted event consumes a slot while its result is being produced. Idle subscriptions (waiting on the next event) cost nothing. The slot is released between events. From fcb05f54457abe8f852854ed6c01bf7c37e5bca1 Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Mon, 20 Apr 2026 09:20:53 +0200 Subject: [PATCH 2/3] more fixes --- ...ternalSchemaServiceCollectionExtensions.cs | 4 +- .../Processing/OperationContext.Pooling.cs | 7 +- .../SubscriptionExecutor.Subscription.cs | 42 +++- .../Processing/SubscriptionExecutor.cs | 40 ++-- .../ConcurrencyGateMiddlewareTests.cs | 27 ++- .../Pipeline/SubscriptionEventTimeoutTests.cs | 195 ++++++++++++++++++ .../Execution/OperationPlanExecutor.cs | 164 +++++++++------ 7 files changed, 371 insertions(+), 108 deletions(-) create mode 100644 src/HotChocolate/Core/test/Execution.Tests/Pipeline/SubscriptionEventTimeoutTests.cs diff --git a/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/InternalSchemaServiceCollectionExtensions.cs b/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/InternalSchemaServiceCollectionExtensions.cs index a89073f3cf7..a0d6826f1cf 100644 --- a/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/InternalSchemaServiceCollectionExtensions.cs +++ b/src/HotChocolate/Core/src/Types/Execution/DependencyInjection/InternalSchemaServiceCollectionExtensions.cs @@ -1,5 +1,6 @@ using HotChocolate.Execution; using HotChocolate.Execution.Instrumentation; +using HotChocolate.Execution.Options; using HotChocolate.Execution.Processing; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.ObjectPool; @@ -22,7 +23,8 @@ internal static IServiceCollection TryAddOperationExecutors( sp.GetRequiredService(), sp.GetRequiredService(), sp.GetRequiredService(), - sp.GetService())); + sp.GetService(), + sp.GetRequiredService())); return services; } diff --git a/src/HotChocolate/Core/src/Types/Execution/Processing/OperationContext.Pooling.cs b/src/HotChocolate/Core/src/Types/Execution/Processing/OperationContext.Pooling.cs index de16c1a4d9b..bedb56a317f 100644 --- a/src/HotChocolate/Core/src/Types/Execution/Processing/OperationContext.Pooling.cs +++ b/src/HotChocolate/Core/src/Types/Execution/Processing/OperationContext.Pooling.cs @@ -70,7 +70,8 @@ public void Initialize( IVariableValueCollection variables, object? rootValue, Func resolveQueryRootValue, - int variableIndex = -1) + int variableIndex = -1, + CancellationToken requestAbortedOverride = default) { _requestContext = requestContext; _schema = Unsafe.As(requestContext.Schema); @@ -78,7 +79,9 @@ public void Initialize( _resolvers = scopedServices.GetRequiredService(); _diagnosticEvents = _schema.Services.GetRequiredService(); _contextData = requestContext.ContextData; - _requestAborted = requestContext.RequestAborted; + _requestAborted = requestAbortedOverride.CanBeCanceled + ? requestAbortedOverride + : requestContext.RequestAborted; _operation = operation; _variables = variables; _services = scopedServices; diff --git a/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.Subscription.cs b/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.Subscription.cs index 05d9262f60a..1194b0d0d3b 100644 --- a/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.Subscription.cs +++ b/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.Subscription.cs @@ -1,4 +1,5 @@ using System.Collections.Immutable; +using System.Runtime.CompilerServices; using HotChocolate.Execution.Instrumentation; using HotChocolate.Execution.Internal; using HotChocolate.Fetching; @@ -17,6 +18,9 @@ private sealed class Subscription : ISubscription, IAsyncDisposable private readonly IExecutionDiagnosticEvents _diagnosticEvents; private readonly IErrorHandler _errorHandler; private readonly ExecutionConcurrencyGate? _concurrencyGate; + private readonly TimeSpan _eventTimeout; + private readonly CancellationTokenSource _eventCts; + private readonly CancellationTokenRegistration _eventCtsRegistration; private IDisposable? _subscriptionScope; private readonly RequestContext _requestContext; private readonly ObjectType _subscriptionType; @@ -36,7 +40,8 @@ private Subscription( SelectionSet rootSelections, Func resolveQueryRootValue, IExecutionDiagnosticEvents diagnosticEvents, - ExecutionConcurrencyGate? concurrencyGate) + ExecutionConcurrencyGate? concurrencyGate, + TimeSpan eventTimeout) { unchecked { @@ -51,7 +56,13 @@ private Subscription( _resolveQueryRootValue = resolveQueryRootValue; _diagnosticEvents = diagnosticEvents; _concurrencyGate = concurrencyGate; + _eventTimeout = eventTimeout; _errorHandler = _requestContext.Schema.Services.GetRequiredService(); + + _eventCts = new CancellationTokenSource(); + _eventCtsRegistration = _requestContext.RequestAborted.UnsafeRegister( + static state => Unsafe.As(state)!.Cancel(), + _eventCts); } /// @@ -84,6 +95,9 @@ private Subscription( /// executions. Each event acquires a slot before invoking the query /// executor, mirroring the pipeline gate for queries and mutations. /// + /// + /// The maximum execution time applied to each subscription event. + /// /// /// Returns a new subscription instance. /// @@ -95,7 +109,8 @@ public static async ValueTask SubscribeAsync( SelectionSet rootSelections, Func resolveQueryRootValue, IExecutionDiagnosticEvents diagnosticsEvents, - ExecutionConcurrencyGate? concurrencyGate) + ExecutionConcurrencyGate? concurrencyGate, + TimeSpan executionTimeout) { var subscription = new Subscription( operationContextPool, @@ -105,7 +120,8 @@ public static async ValueTask SubscribeAsync( rootSelections, resolveQueryRootValue, diagnosticsEvents, - concurrencyGate); + concurrencyGate, + executionTimeout); subscription._subscriptionScope = diagnosticsEvents.ExecuteSubscription(requestContext, subscription.Id); subscription._sourceStream = await subscription.SubscribeAsync().ConfigureAwait(false); @@ -142,6 +158,11 @@ public async ValueTask DisposeAsync() // gracefully dispose the subscription scope. } + // Release the registration before disposing the CTS so that a late + // client-abort callback cannot race on a disposed source. + await _eventCtsRegistration.DisposeAsync().ConfigureAwait(false); + _eventCts.Dispose(); + _subscriptionScope?.Dispose(); _disposed = true; } @@ -170,11 +191,15 @@ private async Task OnEvent(object payload) var gateAcquired = false; OperationContext? operationContext = null; + // Arm the shared CTS so that execution is bounded by the configured event timeout. + _eventCts.CancelAfter(_eventTimeout); + var eventToken = _eventCts.Token; + try { if (_concurrencyGate is { IsEnabled: true }) { - await _concurrencyGate.WaitAsync(_requestContext.RequestAborted).ConfigureAwait(false); + await _concurrencyGate.WaitAsync(eventToken).ConfigureAwait(false); gateAcquired = true; } @@ -203,7 +228,8 @@ private async Task OnEvent(object payload) _requestContext.GetOperation(), _requestContext.VariableValues[0], rootValue, - _resolveQueryRootValue); + _resolveQueryRootValue, + requestAbortedOverride: eventToken); operationContext.Result.SetResultState(WellKnownContextData.EventMessage, payload); @@ -239,6 +265,12 @@ private async Task OnEvent(object payload) { _concurrencyGate!.Release(); } + + // Reset the shared CTS so the next event can start with a fresh budget. + // If TryReset() returns false the source was cancelled (timeout or + // client-abort); the thrown OperationCanceledException has already + // propagated and SubscriptionEnumerator will surface the teardown. + _eventCts.TryReset(); } } diff --git a/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.cs b/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.cs index 9d7d24b46d5..93cbbab2245 100644 --- a/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.cs +++ b/src/HotChocolate/Core/src/Types/Execution/Processing/SubscriptionExecutor.cs @@ -1,32 +1,19 @@ using System.Collections.Immutable; using HotChocolate.Execution.Instrumentation; +using HotChocolate.Execution.Options; using Microsoft.Extensions.ObjectPool; using static HotChocolate.Execution.ThrowHelper; namespace HotChocolate.Execution.Processing; -internal sealed partial class SubscriptionExecutor +internal sealed partial class SubscriptionExecutor( + ObjectPool operationContextPool, + QueryExecutor queryExecutor, + IErrorHandler errorHandler, + IExecutionDiagnosticEvents diagnosticEvents, + ExecutionConcurrencyGate? concurrencyGate, + IRequestTimeoutOptionsAccessor timeoutOptions) { - private readonly ObjectPool _operationContextPool; - private readonly QueryExecutor _queryExecutor; - private readonly IErrorHandler _errorHandler; - private readonly IExecutionDiagnosticEvents _diagnosticEvents; - private readonly ExecutionConcurrencyGate? _concurrencyGate; - - public SubscriptionExecutor( - ObjectPool operationContextPool, - QueryExecutor queryExecutor, - IErrorHandler errorHandler, - IExecutionDiagnosticEvents diagnosticEvents, - ExecutionConcurrencyGate? concurrencyGate = null) - { - _operationContextPool = operationContextPool; - _queryExecutor = queryExecutor; - _errorHandler = errorHandler; - _diagnosticEvents = diagnosticEvents; - _concurrencyGate = concurrencyGate; - } - public async Task ExecuteAsync( RequestContext requestContext, Func resolveQueryValue) @@ -55,14 +42,15 @@ public async Task ExecuteAsync( try { subscription = await Subscription.SubscribeAsync( - _operationContextPool, - _queryExecutor, + operationContextPool, + queryExecutor, requestContext, operation.RootType, selectionSet, resolveQueryValue, - _diagnosticEvents, - _concurrencyGate) + diagnosticEvents, + concurrencyGate, + timeoutOptions.ExecutionTimeout) .ConfigureAwait(false); var response = new ResponseStream(() => subscription.ExecuteAsync()); @@ -81,7 +69,7 @@ public async Task ExecuteAsync( } catch (Exception ex) { - var error = _errorHandler.Handle(ErrorBuilder.FromException(ex).Build()); + var error = errorHandler.Handle(ErrorBuilder.FromException(ex).Build()); if (subscription is not null) { diff --git a/src/HotChocolate/Core/test/Execution.Tests/Pipeline/ConcurrencyGateMiddlewareTests.cs b/src/HotChocolate/Core/test/Execution.Tests/Pipeline/ConcurrencyGateMiddlewareTests.cs index c37d994c7ed..9fd7adc6f59 100644 --- a/src/HotChocolate/Core/test/Execution.Tests/Pipeline/ConcurrencyGateMiddlewareTests.cs +++ b/src/HotChocolate/Core/test/Execution.Tests/Pipeline/ConcurrencyGateMiddlewareTests.cs @@ -155,7 +155,7 @@ public async Task Query_Should_Wait_For_Available_Slot_When_Gate_Is_Saturated() .AddGraphQL() .AddQueryType(d => { - d.Field("block").Resolve(async _ => + d.Field("block").Type().Resolve(async _ => { var index = Interlocked.Increment(ref counter) - 1; if (index < latches.Length) @@ -198,8 +198,12 @@ public async Task Query_Should_Wait_For_Available_Slot_When_Gate_Is_Saturated() var gate = executor.Schema.Services.GetRequiredService(); await WaitUntilAsync(async () => await IsSlotHeldAsync(gate), cts.Token); + var thirdRequest = OperationRequestBuilder.New() + .SetDocument("query Third { instant }") + .SetOperationName("Third") + .Build(); var thirdTask = Task.Run( - () => executor.ExecuteAsync("query Third { instant }", cts.Token), + () => executor.ExecuteAsync(thirdRequest, cts.Token), CancellationToken.None); // third has entered the pipeline but is still waiting on the gate @@ -231,7 +235,7 @@ public async Task Slot_Should_Be_Released_When_Resolver_Throws() .AddGraphQL() .AddQueryType(d => { - d.Field("throw").Resolve(_ => throw new InvalidOperationException("boom")); + d.Field("throw").Type().Resolve(_ => throw new InvalidOperationException("boom")); d.Field("ok").Resolve("ok"); }) .ConfigureSchemaServices(s => s.AddSingleton(new ExecutionConcurrencyGate(maxConcurrentExecutions: 1))) @@ -301,7 +305,7 @@ public async Task Query_Should_Fail_With_Timeout_Error_When_Wait_Exceeds_Executi .AddGraphQL() .AddQueryType(d => { - d.Field("block").Resolve(async _ => + d.Field("block").Type().Resolve(async _ => { await hold.WaitAsync(cts.Token); return "ok"; @@ -335,9 +339,14 @@ public async Task Query_Should_Fail_With_Timeout_Error_When_Wait_Exceeds_Executi e => Assert.Equal(ErrorCodes.Execution.Timeout, e.Code)); Assert.True(await IsSlotHeldAsync(gate)); - // release the blocking query; its slot returns and the gate is empty again + // drain the blocking query, under the shared ExecutionTimeout its own execution + // also times out (the same 200ms applies to whichever request is holding the slot). + // The blocker's fate is not what this test is verifying; the point is that the + // *waiting* request times out cleanly on the gate. Drain it so the slot is returned + // and verify the gate is empty. hold.Signal(); - Assert.Empty((await blocking).ExpectOperationResult().Errors); + await blocking; + await WaitUntilAsync(async () => !await IsSlotHeldAsync(gate), cts.Token); Assert.False(await IsSlotHeldAsync(gate)); } @@ -408,7 +417,7 @@ public async Task Subscription_Event_Should_Wait_For_Slot_When_Query_Is_In_Fligh .AddInMemorySubscriptions() .AddQueryType(d => { - d.Field("block").Resolve(async _ => + d.Field("block").Type().Resolve(async _ => { await queryGate.WaitAsync(cts.Token); lock (orderLock) @@ -427,7 +436,9 @@ public async Task Subscription_Event_Should_Wait_For_Slot_When_Query_Is_In_Fligh .GetRequestExecutorAsync(cancellationToken: cts.Token); var gate = executor.Schema.Services.GetRequiredService(); - var sender = executor.Schema.Services.GetRequiredService(); + // ITopicEventSender is registered in application services, not schema services, + // so it must be resolved via the root service provider accessor. + var sender = executor.Schema.Services.GetRootServiceProvider().GetRequiredService(); // subscribe var subscribe = await executor.ExecuteAsync("subscription { onMessage }", cts.Token); diff --git a/src/HotChocolate/Core/test/Execution.Tests/Pipeline/SubscriptionEventTimeoutTests.cs b/src/HotChocolate/Core/test/Execution.Tests/Pipeline/SubscriptionEventTimeoutTests.cs new file mode 100644 index 00000000000..112354e7bea --- /dev/null +++ b/src/HotChocolate/Core/test/Execution.Tests/Pipeline/SubscriptionEventTimeoutTests.cs @@ -0,0 +1,195 @@ +using HotChocolate.Subscriptions; +using HotChocolate.Types; +using Microsoft.Extensions.DependencyInjection; + +namespace HotChocolate.Execution.Pipeline; + +public sealed class SubscriptionEventTimeoutTests +{ + [Fact] + public async Task Event_Should_Time_Out_When_Execution_Exceeds_Configured_Budget() + { + // arrange + // A short per-event timeout. The resolver blocks until cancelled, so the event + // must time out and the subscription must tear down. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddInMemorySubscriptions() + .AddQueryType(d => d.Field("foo").Resolve("bar")) + .AddSubscriptionType() + .ModifyRequestOptions(o => o.ExecutionTimeout = TimeSpan.FromMilliseconds(200)) + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + var sender = executor.Schema.Services.GetRootServiceProvider() + .GetRequiredService(); + + var subscribe = await executor.ExecuteAsync("subscription { onMessage }", cts.Token); + await using var stream = subscribe.ExpectResponseStream(); + var enumerator = stream.ReadResultsAsync().GetAsyncEnumerator(cts.Token); + + // act + await sender.SendAsync("OnMessage", "one", cts.Token); + + // MoveNextAsync must complete (with false) once the event budget elapses and the + // subscription tears down. + var moveNext = enumerator.MoveNextAsync().AsTask(); + var completed = await Task.WhenAny(moveNext, Task.Delay(5000, cts.Token)); + + // assert + Assert.Same(moveNext, completed); + Assert.False(await moveNext); + + await enumerator.DisposeAsync(); + } + + [Fact] + public async Task Timer_Should_Be_Reset_Between_Events() + { + // arrange + // Budget is short, but each event completes quickly. No event should time out + // and the subscription must keep running across multiple fires. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddInMemorySubscriptions() + .AddQueryType(d => d.Field("foo").Resolve("bar")) + .AddSubscriptionType() + .ModifyRequestOptions(o => o.ExecutionTimeout = TimeSpan.FromSeconds(1)) + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + var sender = executor.Schema.Services.GetRootServiceProvider() + .GetRequiredService(); + + var subscribe = await executor.ExecuteAsync("subscription { onMessage }", cts.Token); + await using var stream = subscribe.ExpectResponseStream(); + var enumerator = stream.ReadResultsAsync().GetAsyncEnumerator(cts.Token); + + // act + // Fire several fast events, each separated by more than half the budget. If the + // CTS were not reset between events the second or third fire would already have + // a cancelled token. + for (var i = 0; i < 3; i++) + { + await sender.SendAsync("OnMessage", $"m{i}", cts.Token); + Assert.True(await enumerator.MoveNextAsync()); + Assert.Empty(enumerator.Current.Errors); + await Task.Delay(600, cts.Token); + } + + // assert + // A fourth event still succeeds — the timer was reset on every prior event. + await sender.SendAsync("OnMessage", "final", cts.Token); + Assert.True(await enumerator.MoveNextAsync()); + Assert.Empty(enumerator.Current.Errors); + + await enumerator.DisposeAsync(); + } + + [Fact] + public async Task Client_Abort_Should_Tear_Down_Current_Event_When_Timeout_Is_Configured() + { + // arrange + // With a per-event timeout configured the shared CTS must still observe the + // request-level abort. Start an event, abort the request, and confirm the + // subscription tears down. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + using var abortCts = new CancellationTokenSource(); + using var linked = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, abortCts.Token); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddInMemorySubscriptions() + .AddQueryType(d => d.Field("foo").Resolve("bar")) + .AddSubscriptionType() + .ModifyRequestOptions(o => o.ExecutionTimeout = TimeSpan.FromSeconds(30)) + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + var sender = executor.Schema.Services.GetRootServiceProvider() + .GetRequiredService(); + + var subscribe = await executor.ExecuteAsync("subscription { onMessage }", linked.Token); + await using var stream = subscribe.ExpectResponseStream(); + var enumerator = stream.ReadResultsAsync().GetAsyncEnumerator(linked.Token); + + await sender.SendAsync("OnMessage", "one", cts.Token); + + var moveNext = enumerator.MoveNextAsync().AsTask(); + + // Give the event loop a brief moment to enter the blocking resolver before aborting. + await Task.Delay(100, cts.Token); + + // act + // abort the request — the shared event CTS should cancel via the registration + await abortCts.CancelAsync(); + + // assert + // MoveNext completes promptly (false = stream ended) once cancellation propagates. + var completed = await Task.WhenAny(moveNext, Task.Delay(2000, cts.Token)); + Assert.Same(moveNext, completed); + Assert.False(await moveNext); + + await enumerator.DisposeAsync(); + } + + [Fact] + public async Task Subscription_Should_Run_End_To_End_With_Default_Execution_Timeout() + { + // arrange + // Default configuration: ExecutionTimeout is 30 seconds. Events must flow + // normally and no spurious cancellation must occur under normal timing. + using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var executor = await new ServiceCollection() + .AddGraphQL() + .AddInMemorySubscriptions() + .AddQueryType(d => d.Field("foo").Resolve("bar")) + .AddSubscriptionType() + .Services + .BuildServiceProvider() + .GetRequestExecutorAsync(cancellationToken: cts.Token); + + var sender = executor.Schema.Services.GetRootServiceProvider() + .GetRequiredService(); + + var subscribe = await executor.ExecuteAsync("subscription { onMessage }", cts.Token); + await using var stream = subscribe.ExpectResponseStream(); + var enumerator = stream.ReadResultsAsync().GetAsyncEnumerator(cts.Token); + + // act + await sender.SendAsync("OnMessage", "hello", cts.Token); + var moved = await enumerator.MoveNextAsync(); + + // assert + Assert.True(moved); + Assert.Empty(enumerator.Current.Errors); + + await enumerator.DisposeAsync(); + } + + public sealed class EchoSubscription + { + [Subscribe] + public string OnMessage([EventMessage] string message) => message; + } + + public sealed class BlockingSubscription + { + [Subscribe] + public async Task OnMessage( + [EventMessage] string message, + CancellationToken cancellationToken) + { + await Task.Delay(Timeout.Infinite, cancellationToken); + return message; + } + } +} diff --git a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/OperationPlanExecutor.cs b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/OperationPlanExecutor.cs index 65c65b7516d..5c3278bbfed 100644 --- a/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/OperationPlanExecutor.cs +++ b/src/HotChocolate/Fusion/src/Fusion.Execution/Execution/OperationPlanExecutor.cs @@ -90,6 +90,7 @@ public async Task SubscribeAsync( subscriptionNode, subscriptionResult, requestContext.Schema.Services.GetService(), + requestContext.Schema.GetRequestOptions().ExecutionTimeout, executionCts.Token, cancellationToken); @@ -226,6 +227,7 @@ private static async IAsyncEnumerable CreateResponseStream( OperationExecutionNode subscriptionNode, SubscriptionResult subscriptionResult, ExecutionConcurrencyGate? concurrencyGate, + TimeSpan eventTimeout, [EnumeratorCancellation] CancellationToken executionCancellationToken, CancellationToken requestCancellationToken) { @@ -237,93 +239,123 @@ private static async IAsyncEnumerable CreateResponseStream( static state => Unsafe.As(state)!.TryResetToIdle(), executionState.Signal); + // We allocate a single CancellationTokenSource per subscription and reuse it + // across all events via TryReset(). The execution token is linked in so that + // client-abort / server-shutdown still propagates. + var eventCts = new CancellationTokenSource(); + var eventCtsRegistration = executionCancellationToken.UnsafeRegister( + static state => Unsafe.As(state)!.Cancel(), + eventCts); + var schemaName = subscriptionNode.SchemaName ?? context.GetDynamicSchemaName(subscriptionNode); - await foreach (var eventArgs in stream) + try { - using var scope = context.DiagnosticEvents.OnSubscriptionEvent( - context, - subscriptionNode, - schemaName, - subscriptionResult.Id); - - OperationResult result; - - var gateAcquired = false; - - try + await foreach (var eventArgs in stream) { - if (concurrencyGate is { IsEnabled: true }) - { - await concurrencyGate.WaitAsync(requestCancellationToken).ConfigureAwait(false); - gateAcquired = true; - } + using var scope = context.DiagnosticEvents.OnSubscriptionEvent( + context, + subscriptionNode, + schemaName, + subscriptionResult.Id); - context.Begin(eventArgs.StartTimestamp, eventArgs.Activity?.TraceId.ToHexString()); + OperationResult result; - executionState.Reset(); + var gateAcquired = false; - executionState.FillBacklog(plan); - executionState.EnqueueForCompletion( - new ExecutionNodeResult( - subscriptionNode.Id, - eventArgs.Activity, - eventArgs.Status, - eventArgs.Duration, - Exception: null, - DependentsToExecute: [], - SkippedDefinitions: [], - VariableValueSets: eventArgs.VariableValueSets)); + // Arm the shared CTS for this event and derive the per-event token so + // that each event is bounded by the configured execution timeout. + eventCts.CancelAfter(eventTimeout); + var eventToken = eventCts.Token; - while (!executionCancellationToken.IsCancellationRequested && executionState.IsProcessing()) + try { - while (executionState.TryDequeueCompletedResult(out var nodeResult)) + if (concurrencyGate is { IsEnabled: true }) { - var node = plan.GetNodeById(nodeResult.Id); - executionState.CompleteNode(plan, node, nodeResult); + await concurrencyGate.WaitAsync(eventToken).ConfigureAwait(false); + gateAcquired = true; } - executionState.EnqueueNextNodes(context, executionCancellationToken); + context.Begin(eventArgs.StartTimestamp, eventArgs.Activity?.TraceId.ToHexString()); + + executionState.Reset(); - if (executionCancellationToken.IsCancellationRequested || !executionState.IsProcessing()) + executionState.FillBacklog(plan); + executionState.EnqueueForCompletion( + new ExecutionNodeResult( + subscriptionNode.Id, + eventArgs.Activity, + eventArgs.Status, + eventArgs.Duration, + Exception: null, + DependentsToExecute: [], + SkippedDefinitions: [], + VariableValueSets: eventArgs.VariableValueSets)); + + while (!eventToken.IsCancellationRequested && executionState.IsProcessing()) { - break; + while (executionState.TryDequeueCompletedResult(out var nodeResult)) + { + var node = plan.GetNodeById(nodeResult.Id); + executionState.CompleteNode(plan, node, nodeResult); + } + + executionState.EnqueueNextNodes(context, eventToken); + + if (eventToken.IsCancellationRequested || !executionState.IsProcessing()) + { + break; + } + + // The signal will be set every time a node completes and will release the executor + // from the async wait to go through the completed results. + await executionState.Signal; } - // The signal will be set every time a node completes and will release the executor - // from the async wait to go through the completed results. - await executionState.Signal; - } + // If the original CancellationToken of the request was cancelled, + // the Execution nodes and the PlanExecutor should have been gracefully cancelled, + // so we throw here to properly cancel the request execution. + requestCancellationToken.ThrowIfCancellationRequested(); + // If the event budget was exhausted, surface it as a cancellation so the + // stream tears down and the caller can observe the timeout. + eventToken.ThrowIfCancellationRequested(); - // If the original CancellationToken of the request was cancelled, - // the Execution nodes and the PlanExecutor should have been gracefully cancelled, - // so we throw here to properly cancel the request execution. - requestCancellationToken.ThrowIfCancellationRequested(); + result = context.Complete(reusable: true); + } + catch (Exception ex) when (ex is not OperationCanceledException) + { + context.DiagnosticEvents.SubscriptionEventError( + context, + subscriptionNode, + schemaName, + subscriptionResult.Id, + ex); + throw; + } + finally + { + // disposing the eventArgs disposes the telemetry scope. + eventArgs.Dispose(); - result = context.Complete(reusable: true); - } - catch (Exception ex) when (ex is not OperationCanceledException) - { - context.DiagnosticEvents.SubscriptionEventError( - context, - subscriptionNode, - schemaName, - subscriptionResult.Id, - ex); - throw; - } - finally - { - // disposing the eventArgs disposes the telemetry scope. - eventArgs.Dispose(); + if (gateAcquired) + { + concurrencyGate!.Release(); + } - if (gateAcquired) - { - concurrencyGate!.Release(); + // Reset the shared CTS so the next event can start with a fresh budget. + // If TryReset() returns false the source was cancelled (timeout or + // client-abort); the thrown OperationCanceledException has already + // propagated and the enumerator surfaces the teardown. + eventCts.TryReset(); } - } - yield return result; + yield return result; + } + } + finally + { + await eventCtsRegistration.DisposeAsync(); + eventCts?.Dispose(); } } } From 3d90cb97261d405f0d229ea210a98830c00c9216 Mon Sep 17 00:00:00 2001 From: Michael Staib Date: Mon, 20 Apr 2026 11:37:36 +0200 Subject: [PATCH 3/3] update docs --- website/src/docs/fusion/v16/performance-tuning.md | 14 +++++++++++++- .../v16/migrating/migrate-from-15-to-16.md | 4 +++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/website/src/docs/fusion/v16/performance-tuning.md b/website/src/docs/fusion/v16/performance-tuning.md index 66ca9b2d266..c61b62bb934 100644 --- a/website/src/docs/fusion/v16/performance-tuning.md +++ b/website/src/docs/fusion/v16/performance-tuning.md @@ -183,7 +183,19 @@ app.MapGraphQLHttp() Start with the default of 64 and adjust based on your workload. If you expect many long-lived subscriptions firing frequent events, factor those into your sizing. They now contend for the same slots as queries and mutations. Set to `null` to disable the limit entirely. -Queued executions wait on the gate; the `ExecutionTimeout` setting (default 30 seconds) bounds how long they can wait before the request is cancelled with a clean timeout error. If you regularly see executions timing out at the gate, that's a signal to scale out or raise the limit rather than extend the timeout. +### Execution Cancellation + +Every execution is bounded by the `ExecutionTimeout` option (default 30 seconds). This applies uniformly to queries, mutations, subscription handshakes, and each subscription event. The budget covers both the time an execution spends waiting for a concurrency slot and the time it spends running. When the budget is exceeded, the execution is cancelled and the caller receives a clean timeout error. + +`ExecutionTimeout` is the single setting that controls cancellation for every execution. Configure it with `ModifyRequestOptions`: + +```csharp +builder + .AddGraphQLGateway() + .ModifyRequestOptions(o => o.ExecutionTimeout = TimeSpan.FromSeconds(10)); +``` + +If executions routinely time out at the gate, that is a signal to scale out or raise `MaxConcurrentExecutions`. Increasing `ExecutionTimeout` only defers the problem. ## Next Steps diff --git a/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md b/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md index 5c5c57ae463..c4f5ce0bfa2 100644 --- a/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md +++ b/website/src/docs/hotchocolate/v16/migrating/migrate-from-15-to-16.md @@ -1096,6 +1096,8 @@ builder .ModifyServerOptions(o => o.MaxConcurrentExecutions = 128); ``` -The default is **64**. Operations that arrive while the gate is full queue up and run as slots free. The `ExecutionTimeout` setting (default 30 seconds) bounds how long an operation can wait before it is cancelled with a clean timeout error. Set the limit to `null` to disable the gate entirely. +The default is **64**. Operations that arrive while the gate is full queue up and run as slots free. Set the limit to `null` to disable the gate entirely. + +Every execution is bounded by the `ExecutionTimeout` option (default 30 seconds). This applies uniformly to queries, mutations, subscription handshakes, and each subscription event. The budget covers both the time an execution spends waiting for a concurrency slot and the time it spends running. When the budget is exceeded, the execution is cancelled and the caller receives a clean timeout error. `ExecutionTimeout` is the single setting that controls cancellation for every execution. Subscriptions participate in the limit like any other operation. The initial subscribe consumes a slot while the subscribe resolver runs, and each emitted event consumes a slot while its result is being produced. Idle subscriptions (waiting on the next event) cost nothing. The slot is released between events.