diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index 56d8eda7c8b0b..150130a9b39fb 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -1368,6 +1368,40 @@ This allows the cluster to quickly ramp up when idle while still providing protection against overload when the cluster is busy. Set to ``0`` to always apply pacing when ``max-queries-per-second`` is configured. +``max-total-running-task-count-to-not-execute-new-query`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``integer`` +* **Minimum value:** ``1`` +* **Default value:** ``2147483647`` (unlimited) + +Maximum total running task count across all queries on the coordinator. When +this threshold is exceeded, new queries are held in the queue rather than +being scheduled for execution. This helps prevent coordinator overload by +limiting the number of concurrent tasks being managed. + +Unlike ``max-total-running-task-count-to-kill-query`` which kills queries when +the limit is exceeded, this property proactively prevents new queries from +starting while allowing existing queries to complete normally. + +This property works in conjunction with query admission pacing +(``query-manager.query-pacing.max-queries-per-second``) to provide +comprehensive coordinator load management. When both are configured: + +1. Pacing controls the rate at which queries are admitted +2. This property provides a hard cap on total concurrent tasks + +Without query-pacing, the cluster can admit multiple queries at once, which +can lead to significantly more concurrent tasks than expected over this limit. + +Set to a lower value (e.g., ``50000``) to limit coordinator task management +overhead. The default value effectively disables this feature. + +.. note:: + + For backwards compatibility, this property can also be configured using the + legacy name ``experimental.max-total-running-task-count-to-not-execute-new-query``. + Query Retry Properties ---------------------- diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java index 838f69f9fad87..c3c6547629097 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java @@ -321,7 +321,8 @@ public int getMaxQueryRunningTaskCount() return maxQueryRunningTaskCount; } - @Config("experimental.max-total-running-task-count-to-not-execute-new-query") + @LegacyConfig("experimental.max-total-running-task-count-to-not-execute-new-query") + @Config("max-total-running-task-count-to-not-execute-new-query") @ConfigDescription("Keep new queries in the queue if total task count exceeds this threshold") public QueryManagerConfig setMaxTotalRunningTaskCountToNotExecuteNewQuery(int maxTotalRunningTaskCountToNotExecuteNewQuery) { diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java index 9bd91021d34cc..b13edb21603e9 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java @@ -740,7 +740,18 @@ public void run(ManagedQueryExecution query) else { query.setResourceGroupQueryLimits(perQueryLimits); boolean immediateStartCandidate = canRun && queuedQueries.isEmpty(); - if (immediateStartCandidate && queryPacingContext.tryAcquireAdmissionSlot()) { + boolean startQuery = immediateStartCandidate; + if (immediateStartCandidate) { + // Check for coordinator overload (task limit exceeded or denied admission) + //isTaskLimitExceeded MUST be checked before tryAcquireAdmissionSlot, or else admission slots will be acquired but not started + boolean coordOverloaded = ((RootInternalResourceGroup) root).isTaskLimitExceeded() + || !queryPacingContext.tryAcquireAdmissionSlot(); + if (coordOverloaded) { + startQuery = false; + } + } + + if (startQuery) { startInBackground(query); } else { @@ -914,6 +925,10 @@ protected boolean internalStartNext() { checkState(Thread.holdsLock(root), "Must hold lock to find next query"); synchronized (root) { + if (((RootInternalResourceGroup) root).isTaskLimitExceeded()) { + return false; + } + if (!canRunMore()) { return false; } @@ -1052,10 +1067,6 @@ private boolean canRunMore() return false; } - if (((RootInternalResourceGroup) root).isTaskLimitExceeded()) { - return false; - } - int hardConcurrencyLimit = getHardConcurrencyLimitBasedOnCpuUsage(); int totalRunningQueries = runningQueries.size() + descendantRunningQueries; diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java index 109a1be212e3c..3015c08993a43 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java @@ -102,7 +102,7 @@ public void testExplicitPropertyMappings() .put("query.stage-count-warning-threshold", "12300") .put("max-total-running-task-count-to-kill-query", "60000") .put("max-query-running-task-count", "10000") - .put("experimental.max-total-running-task-count-to-not-execute-new-query", "50000") + .put("max-total-running-task-count-to-not-execute-new-query", "50000") .put("concurrency-threshold-to-enable-resource-group-refresh", "2") .put("resource-group-runtimeinfo-refresh-interval", "10ms") .put("query.schedule-split-batch-size", "99") diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java b/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java index 43e9ba467e92b..79b035e279d39 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java @@ -1098,4 +1098,215 @@ public String getName() return new ClusterResourceChecker(mockPolicy, config, createNodeManager()); } + + // Tests that when task limit is exceeded, new queries are queued instead of starting immediately + @Test(timeOut = 10_000) + public void testTaskLimitExceededQueuesQuery() + { + RootInternalResourceGroup root = new RootInternalResourceGroup( + "root", + (group, export) -> {}, + directExecutor(), + ignored -> Optional.empty(), + rg -> false, + createNodeManager(), + createClusterResourceChecker(), + QueryPacingContext.NOOP); + root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + root.setMaxQueuedQueries(10); + root.setHardConcurrencyLimit(10); + + // Set task limit exceeded + root.setTaskLimitExceeded(true); + + // Submit a query - it should be queued because task limit is exceeded + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + query1.startWaitingForPrerequisites(); + root.run(query1); + + // Query should be queued, not running + assertEquals(query1.getState(), QUEUED); + assertEquals(root.getQueuedQueries(), 1); + assertEquals(root.getRunningQueries(), 0); + } + + // Tests that queued queries start when task limit is no longer exceeded + @Test(timeOut = 10_000) + public void testQueryStartsWhenTaskLimitClears() + { + RootInternalResourceGroup root = new RootInternalResourceGroup( + "root", + (group, export) -> {}, + directExecutor(), + ignored -> Optional.empty(), + rg -> false, + createNodeManager(), + createClusterResourceChecker(), + QueryPacingContext.NOOP); + root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + root.setMaxQueuedQueries(10); + root.setHardConcurrencyLimit(10); + + // Set task limit exceeded + root.setTaskLimitExceeded(true); + + // Submit queries - they should be queued + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + query1.startWaitingForPrerequisites(); + root.run(query1); + MockManagedQueryExecution query2 = new MockManagedQueryExecution(0); + query2.startWaitingForPrerequisites(); + root.run(query2); + + assertEquals(query1.getState(), QUEUED); + assertEquals(query2.getState(), QUEUED); + assertEquals(root.getQueuedQueries(), 2); + assertEquals(root.getRunningQueries(), 0); + + // Clear task limit + root.setTaskLimitExceeded(false); + + // Process queued queries - they should now start + root.processQueuedQueries(); + + assertEquals(query1.getState(), RUNNING); + assertEquals(query2.getState(), RUNNING); + assertEquals(root.getQueuedQueries(), 0); + assertEquals(root.getRunningQueries(), 2); + } + + // Tests that queries in a subgroup hierarchy are properly queued and started when task limit changes + @Test(timeOut = 10_000) + public void testTaskLimitExceededWithSubgroups() + { + RootInternalResourceGroup root = new RootInternalResourceGroup( + "root", + (group, export) -> {}, + directExecutor(), + ignored -> Optional.empty(), + rg -> false, + createNodeManager(), + createClusterResourceChecker(), + QueryPacingContext.NOOP); + root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + root.setMaxQueuedQueries(10); + root.setHardConcurrencyLimit(10); + + InternalResourceGroup groupA = root.getOrCreateSubGroup("A", true); + groupA.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + groupA.setMaxQueuedQueries(10); + groupA.setHardConcurrencyLimit(10); + + InternalResourceGroup groupG = groupA.getOrCreateSubGroup("G", true); + groupG.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + groupG.setMaxQueuedQueries(10); + groupG.setHardConcurrencyLimit(10); + + // Set task limit exceeded + root.setTaskLimitExceeded(true); + + // Submit a query to leaf group G - it should be queued + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + query1.startWaitingForPrerequisites(); + groupG.run(query1); + + assertEquals(query1.getState(), QUEUED); + assertEquals(groupG.getQueuedQueries(), 1); + assertEquals(groupG.getRunningQueries(), 0); + + // Clear task limit and process queued queries + root.setTaskLimitExceeded(false); + root.processQueuedQueries(); + + // Query should now be running + assertEquals(query1.getState(), RUNNING); + assertEquals(groupG.getQueuedQueries(), 0); + assertEquals(groupG.getRunningQueries(), 1); + } + + // Tests that when task limit is exceeded, queries already running continue, but new ones are queued + @Test(timeOut = 10_000) + public void testTaskLimitExceededDoesNotAffectRunningQueries() + { + RootInternalResourceGroup root = new RootInternalResourceGroup( + "root", + (group, export) -> {}, + directExecutor(), + ignored -> Optional.empty(), + rg -> false, + createNodeManager(), + createClusterResourceChecker(), + QueryPacingContext.NOOP); + root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + root.setMaxQueuedQueries(10); + root.setHardConcurrencyLimit(10); + + // Submit a query before task limit is exceeded - it should run + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + query1.startWaitingForPrerequisites(); + root.run(query1); + assertEquals(query1.getState(), RUNNING); + + // Now set task limit exceeded + root.setTaskLimitExceeded(true); + + // Submit another query - it should be queued + MockManagedQueryExecution query2 = new MockManagedQueryExecution(0); + query2.startWaitingForPrerequisites(); + root.run(query2); + assertEquals(query2.getState(), QUEUED); + + // The first query should still be running + assertEquals(query1.getState(), RUNNING); + assertEquals(root.getRunningQueries(), 1); + assertEquals(root.getQueuedQueries(), 1); + } + + // Tests that task limit transitions work correctly with multiple cycles + @Test(timeOut = 10_000) + public void testTaskLimitExceededMultipleCycles() + { + RootInternalResourceGroup root = new RootInternalResourceGroup( + "root", + (group, export) -> {}, + directExecutor(), + ignored -> Optional.empty(), + rg -> false, + createNodeManager(), + createClusterResourceChecker(), + QueryPacingContext.NOOP); + root.setSoftMemoryLimit(new DataSize(1, MEGABYTE)); + root.setMaxQueuedQueries(10); + root.setHardConcurrencyLimit(10); + + // Cycle 1: Task limit exceeded, query queued + root.setTaskLimitExceeded(true); + MockManagedQueryExecution query1 = new MockManagedQueryExecution(0); + query1.startWaitingForPrerequisites(); + root.run(query1); + assertEquals(query1.getState(), QUEUED); + + // Clear task limit, query starts + root.setTaskLimitExceeded(false); + root.processQueuedQueries(); + assertEquals(query1.getState(), RUNNING); + + // Cycle 2: Task limit exceeded again, new query queued + root.setTaskLimitExceeded(true); + MockManagedQueryExecution query2 = new MockManagedQueryExecution(0); + query2.startWaitingForPrerequisites(); + root.run(query2); + assertEquals(query2.getState(), QUEUED); + assertEquals(query1.getState(), RUNNING); // query1 still running + + // Complete query1, processQueuedQueries should not start query2 (task limit still exceeded) + query1.complete(); + root.processQueuedQueries(); + assertEquals(query2.getState(), QUEUED); // Still queued because task limit exceeded + + // Clear task limit, query2 starts + root.setTaskLimitExceeded(false); + root.processQueuedQueries(); + assertEquals(query2.getState(), RUNNING); + } } diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java index 4c62a887cced1..b0bb921fa1e9f 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestQueryTaskLimit.java @@ -94,7 +94,7 @@ public void testQueuingWhenTaskLimitExceeds() { ImmutableMap extraProperties = ImmutableMap.builder() .put("experimental.spill-enabled", "false") - .put("experimental.max-total-running-task-count-to-not-execute-new-query", "2") + .put("max-total-running-task-count-to-not-execute-new-query", "2") .build(); try (DistributedQueryRunner queryRunner = createQueryRunner(defaultSession, extraProperties)) {