From 399941091211318e1821b9f053ef911829aebe02 Mon Sep 17 00:00:00 2001 From: Shelton Cai Date: Tue, 17 Feb 2026 11:59:02 -0800 Subject: [PATCH] Coordinator Task Throttling Bug Fix (#27146) Summary: Pull Request resolved: https://github.com/prestodb/presto/pull/27146 With coordinator task based throttling (queueing) enabled, we run into an issue where certain resource groups are never updated to be eligible to run. This occurs when the resource group is created during a task throttling period and canRun returns false, resulting in the resource group never being added as an eligible subgroup on creation. When we exit task throttling, an eligibility update is never triggered. if this group doesnt have a new query added after we exit task throttling, its status is never updated. Changes: 1. move the isTaskLimitExceeded check from canRunMore to internalStartNext, canRunMore will return true allowing the group to be marked as eligible, but internalStartNext will prevent the group from running more queries. 2. add check to enqueue immediate execution candidates if task throttling 3. remove experimental from session property 4. add tests to ensure resource groups properly queue/run queries with task limits (should this be in resourceGroups or testQueryTaskLimit?) Reviewed By: spershin Differential Revision: D92632990 --- .../src/main/sphinx/admin/properties.rst | 34 +++ .../presto/execution/QueryManagerConfig.java | 3 +- .../resourceGroups/InternalResourceGroup.java | 21 +- .../execution/TestQueryManagerConfig.java | 2 +- .../resourceGroups/TestResourceGroups.java | 211 ++++++++++++++++++ .../presto/tests/TestQueryTaskLimit.java | 2 +- 6 files changed, 265 insertions(+), 8 deletions(-) diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index 56d8eda7c8b0b..150130a9b39fb 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -1368,6 +1368,40 @@ This allows the cluster to quickly ramp up when idle while still providing protection against overload when the cluster is busy. Set to ``0`` to always apply pacing when ``max-queries-per-second`` is configured. +``max-total-running-task-count-to-not-execute-new-query`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``integer`` +* **Minimum value:** ``1`` +* **Default value:** ``2147483647`` (unlimited) + +Maximum total running task count across all queries on the coordinator. When +this threshold is exceeded, new queries are held in the queue rather than +being scheduled for execution. This helps prevent coordinator overload by +limiting the number of concurrent tasks being managed. + +Unlike ``max-total-running-task-count-to-kill-query`` which kills queries when +the limit is exceeded, this property proactively prevents new queries from +starting while allowing existing queries to complete normally. + +This property works in conjunction with query admission pacing +(``query-manager.query-pacing.max-queries-per-second``) to provide +comprehensive coordinator load management. When both are configured: + +1. Pacing controls the rate at which queries are admitted +2. This property provides a hard cap on total concurrent tasks + +Without query-pacing, the cluster can admit multiple queries at once, which +can lead to significantly more concurrent tasks than expected over this limit. + +Set to a lower value (e.g., ``50000``) to limit coordinator task management +overhead. The default value effectively disables this feature. + +.. note:: + + For backwards compatibility, this property can also be configured using the + legacy name ``experimental.max-total-running-task-count-to-not-execute-new-query``. + Query Retry Properties ---------------------- diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java index 838f69f9fad87..c3c6547629097 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java @@ -321,7 +321,8 @@ public int getMaxQueryRunningTaskCount() return maxQueryRunningTaskCount; } - @Config("experimental.max-total-running-task-count-to-not-execute-new-query") + @LegacyConfig("experimental.max-total-running-task-count-to-not-execute-new-query") + @Config("max-total-running-task-count-to-not-execute-new-query") @ConfigDescription("Keep new queries in the queue if total task count exceeds this threshold") public QueryManagerConfig setMaxTotalRunningTaskCountToNotExecuteNewQuery(int maxTotalRunningTaskCountToNotExecuteNewQuery) { diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java index 9bd91021d34cc..b13edb21603e9 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java @@ -740,7 +740,18 @@ public void run(ManagedQueryExecution query) else { query.setResourceGroupQueryLimits(perQueryLimits); boolean immediateStartCandidate = canRun && queuedQueries.isEmpty(); - if (immediateStartCandidate && queryPacingContext.tryAcquireAdmissionSlot()) { + boolean startQuery = immediateStartCandidate; + if (immediateStartCandidate) { + // Check for coordinator overload (task limit exceeded or denied admission) + //isTaskLimitExceeded MUST be checked before tryAcquireAdmissionSlot, or else admission slots will be acquired but not started + boolean coordOverloaded = ((RootInternalResourceGroup) root).isTaskLimitExceeded() + || !queryPacingContext.tryAcquireAdmissionSlot(); + if (coordOverloaded) { + startQuery = false; + } + } + + if (startQuery) { startInBackground(query); } else { @@ -914,6 +925,10 @@ protected boolean internalStartNext() { checkState(Thread.holdsLock(root), "Must hold lock to find next query"); synchronized (root) { + if (((RootInternalResourceGroup) root).isTaskLimitExceeded()) { + return false; + } + if (!canRunMore()) { return false; } @@ -1052,10 +1067,6 @@ private boolean canRunMore() return false; } - if (((RootInternalResourceGroup) root).isTaskLimitExceeded()) { - return false; - } - int hardConcurrencyLimit = getHardConcurrencyLimitBasedOnCpuUsage(); int totalRunningQueries = runningQueries.size() + descendantRunningQueries; diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java index 109a1be212e3c..3015c08993a43 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java @@ -102,7 +102,7 @@ public void testExplicitPropertyMappings() .put("query.stage-count-warning-threshold", "12300") .put("max-total-running-task-count-to-kill-query", "60000") .put("max-query-running-task-count", "10000") - .put("experimental.max-total-running-task-count-to-not-execute-new-query", "50000") + .put("max-total-running-task-count-to-not-execute-new-query", "50000") .put("concurrency-threshold-to-enable-resource-group-refresh", "2") .put("resource-group-runtimeinfo-refresh-interval", "10ms") .put("query.schedule-split-batch-size", "99") diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java b/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java index 43e9ba467e92b..79b035e279d39 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java @@ -1098,4 +1098,215 @@ public String getName() return new ClusterResourceChecker(mockPolicy, config, createNodeManager()); } + + // Tests that when task limit is exceeded, new queries are queued instead of starting immediately + @Test(timeOut = 10_000) + public void testTaskLimitExceededQueuesQuery() + { + RootInternalResourceGroup root = new RootInternalResourceGroup( + "root", + (group, export) -> {}, + directExecutor(), + ignored -> Optional.empty(), + rg -> false, + createNodeManager(), + createClusterResourceChecker(), + QueryPacingContext.NOOP); + root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + root.setMaxQueuedQueries(10); + root.setHardConcurrencyLimit(10); + + // Set task limit exceeded + root.setTaskLimitExceeded(true); + + // Submit a query - it should be queued because task limit is exceeded + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + query1.startWaitingForPrerequisites(); + root.run(query1); + + // Query should be queued, not running + assertEquals(query1.getState(), QUEUED); + assertEquals(root.getQueuedQueries(), 1); + assertEquals(root.getRunningQueries(), 0); + } + + // Tests that queued queries start when task limit is no longer exceeded + @Test(timeOut = 10_000) + public void testQueryStartsWhenTaskLimitClears() + { + RootInternalResourceGroup root = new RootInternalResourceGroup( + "root", + (group, export) -> {}, + directExecutor(), + ignored -> Optional.empty(), + rg -> false, + createNodeManager(), + createClusterResourceChecker(), + QueryPacingContext.NOOP); + root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + root.setMaxQueuedQueries(10); + root.setHardConcurrencyLimit(10); + + // Set task limit exceeded + root.setTaskLimitExceeded(true); + + // Submit queries - they should be queued + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + query1.startWaitingForPrerequisites(); + root.run(query1); + MockManagedQueryExecution query2 = new MockManagedQueryExecution(0); + query2.startWaitingForPrerequisites(); + root.run(query2); + + assertEquals(query1.getState(), QUEUED); + assertEquals(query2.getState(), QUEUED); + assertEquals(root.getQueuedQueries(), 2); + assertEquals(root.getRunningQueries(), 0); + + // Clear task limit + root.setTaskLimitExceeded(false); + + // Process queued queries - they should now start + root.processQueuedQueries(); + + assertEquals(query1.getState(), RUNNING); + assertEquals(query2.getState(), RUNNING); + assertEquals(root.getQueuedQueries(), 0); + assertEquals(root.getRunningQueries(), 2); + } + + // Tests that queries in a subgroup hierarchy are properly queued and started when task limit changes + @Test(timeOut = 10_000) + public void testTaskLimitExceededWithSubgroups() + { + RootInternalResourceGroup root = new RootInternalResourceGroup( + "root", + (group, export) -> {}, + directExecutor(), + ignored -> Optional.empty(), + rg -> false, + createNodeManager(), + createClusterResourceChecker(), + QueryPacingContext.NOOP); + root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + root.setMaxQueuedQueries(10); + root.setHardConcurrencyLimit(10); + + InternalResourceGroup groupA = root.getOrCreateSubGroup("A", true); + groupA.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + groupA.setMaxQueuedQueries(10); + groupA.setHardConcurrencyLimit(10); + + InternalResourceGroup groupG = groupA.getOrCreateSubGroup("G", true); + groupG.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + groupG.setMaxQueuedQueries(10); + groupG.setHardConcurrencyLimit(10); + + // Set task limit exceeded + root.setTaskLimitExceeded(true); + + // Submit a query to leaf group G - it should be queued + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + query1.startWaitingForPrerequisites(); + groupG.run(query1); + + assertEquals(query1.getState(), QUEUED); + assertEquals(groupG.getQueuedQueries(), 1); + assertEquals(groupG.getRunningQueries(), 0); + + // Clear task limit and process queued queries + root.setTaskLimitExceeded(false); + root.processQueuedQueries(); + + // Query should now be running + assertEquals(query1.getState(), RUNNING); + assertEquals(groupG.getQueuedQueries(), 0); + assertEquals(groupG.getRunningQueries(), 1); + } + + // Tests that when task limit is exceeded, queries already running continue, but new ones are queued + @Test(timeOut = 10_000) + public void testTaskLimitExceededDoesNotAffectRunningQueries() + { + RootInternalResourceGroup root = new RootInternalResourceGroup( + "root", + (group, export) -> {}, + directExecutor(), + ignored -> Optional.empty(), + rg -> false, + createNodeManager(), + createClusterResourceChecker(), + QueryPacingContext.NOOP); + root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + root.setMaxQueuedQueries(10); + root.setHardConcurrencyLimit(10); + + // Submit a query before task limit is exceeded - it should run + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + query1.startWaitingForPrerequisites(); + root.run(query1); + assertEquals(query1.getState(), RUNNING); + + // Now set task limit exceeded + root.setTaskLimitExceeded(true); + + // Submit another query - it should be queued + MockManagedQueryExecution query2 = new MockManagedQueryExecution(0); + query2.startWaitingForPrerequisites(); + root.run(query2); + assertEquals(query2.getState(), QUEUED); + + // The first query should still be running + assertEquals(query1.getState(), RUNNING); + assertEquals(root.getRunningQueries(), 1); + assertEquals(root.getQueuedQueries(), 1); + } + + // Tests that task limit transitions work correctly with multiple cycles + @Test(timeOut = 10_000) + public void testTaskLimitExceededMultipleCycles() + { + RootInternalResourceGroup root = new RootInternalResourceGroup( + "root", + (group, export) -> {}, + directExecutor(), + ignored -> Optional.empty(), + rg -> false, + createNodeManager(), + createClusterResourceChecker(), + QueryPacingContext.NOOP); + root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + root.setMaxQueuedQueries(10); + root.setHardConcurrencyLimit(10); + + // Cycle 1: Task limit exceeded, query queued + root.setTaskLimitExceeded(true); + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + query1.startWaitingForPrerequisites(); + root.run(query1); + assertEquals(query1.getState(), QUEUED); + + // Clear task limit, query starts + root.setTaskLimitExceeded(false); + root.processQueuedQueries(); + assertEquals(query1.getState(), RUNNING); + + // Cycle 2: Task limit exceeded again, new query queued + root.setTaskLimitExceeded(true); + MockManagedQueryExecution query2 = new MockManagedQueryExecution(0); + query2.startWaitingForPrerequisites(); + root.run(query2); + assertEquals(query2.getState(), QUEUED); + assertEquals(query1.getState(), RUNNING); // query1 still running + + // Complete query1, processQueuedQueries should not start query2 (task limit still exceeded) + query1.complete(); + root.processQueuedQueries(); + assertEquals(query2.getState(), QUEUED); // Still queued because task limit exceeded + + // Clear task limit, query2 starts + root.setTaskLimitExceeded(false); + root.processQueuedQueries(); + assertEquals(query2.getState(), RUNNING); + } } diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java index 4c62a887cced1..b0bb921fa1e9f 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java @@ -94,7 +94,7 @@ public void testQueuingWhenTaskLimitExceeds() { ImmutableMap extraProperties = ImmutableMap.builder() .put("experimental.spill-enabled", "false") - .put("experimental.max-total-running-task-count-to-not-execute-new-query", "2") + .put("max-total-running-task-count-to-not-execute-new-query", "2") .build(); try (DistributedQueryRunner queryRunner = createQueryRunner(defaultSession, extraProperties)) {