Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,40 @@ This allows the cluster to quickly ramp up when idle while still providing
protection against overload when the cluster is busy. Set to ``0`` to always
apply pacing when ``max-queries-per-second`` is configured.

``max-total-running-task-count-to-not-execute-new-query``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Minimum value:** ``1``
* **Default value:** ``2147483647`` (unlimited)

Maximum total running task count across all queries on the coordinator. When
this threshold is exceeded, new queries are held in the queue rather than
being scheduled for execution. This helps prevent coordinator overload by
limiting the number of concurrent tasks being managed.

Unlike ``max-total-running-task-count-to-kill-query`` which kills queries when
the limit is exceeded, this property proactively prevents new queries from
starting while allowing existing queries to complete normally.

This property works in conjunction with query admission pacing
(``query-manager.query-pacing.max-queries-per-second``) to provide
comprehensive coordinator load management. When both are configured:

1. Pacing controls the rate at which queries are admitted
2. This property provides a hard cap on total concurrent tasks

Without query-pacing, the cluster can admit multiple queries at once, which
can lead to significantly more concurrent tasks than expected over this limit.

Set to a lower value (e.g., ``50000``) to limit coordinator task management
overhead. The default value effectively disables this feature.

.. note::

For backwards compatibility, this property can also be configured using the
legacy name ``experimental.max-total-running-task-count-to-not-execute-new-query``.

Query Retry Properties
----------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ public int getMaxQueryRunningTaskCount()
return maxQueryRunningTaskCount;
}

@Config("experimental.max-total-running-task-count-to-not-execute-new-query")
@LegacyConfig("experimental.max-total-running-task-count-to-not-execute-new-query")
@Config("max-total-running-task-count-to-not-execute-new-query")
@ConfigDescription("Keep new queries in the queue if total task count exceeds this threshold")
public QueryManagerConfig setMaxTotalRunningTaskCountToNotExecuteNewQuery(int maxTotalRunningTaskCountToNotExecuteNewQuery)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,18 @@ public void run(ManagedQueryExecution query)
else {
query.setResourceGroupQueryLimits(perQueryLimits);
boolean immediateStartCandidate = canRun && queuedQueries.isEmpty();
if (immediateStartCandidate && queryPacingContext.tryAcquireAdmissionSlot()) {
boolean startQuery = immediateStartCandidate;
if (immediateStartCandidate) {
// Check for coordinator overload (task limit exceeded or denied admission)
//isTaskLimitExceeded MUST be checked before tryAcquireAdmissionSlot, or else admission slots will be acquired but not started
boolean coordOverloaded = ((RootInternalResourceGroup) root).isTaskLimitExceeded()
|| !queryPacingContext.tryAcquireAdmissionSlot();
if (coordOverloaded) {
startQuery = false;
}
}

if (startQuery) {
startInBackground(query);
}
else {
Expand Down Expand Up @@ -914,6 +925,10 @@ protected boolean internalStartNext()
{
checkState(Thread.holdsLock(root), "Must hold lock to find next query");
synchronized (root) {
if (((RootInternalResourceGroup) root).isTaskLimitExceeded()) {
return false;
}

if (!canRunMore()) {
return false;
}
Expand Down Expand Up @@ -1052,10 +1067,6 @@ private boolean canRunMore()
return false;
}

if (((RootInternalResourceGroup) root).isTaskLimitExceeded()) {
return false;
}

int hardConcurrencyLimit = getHardConcurrencyLimitBasedOnCpuUsage();

int totalRunningQueries = runningQueries.size() + descendantRunningQueries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testExplicitPropertyMappings()
.put("query.stage-count-warning-threshold", "12300")
.put("max-total-running-task-count-to-kill-query", "60000")
.put("max-query-running-task-count", "10000")
.put("experimental.max-total-running-task-count-to-not-execute-new-query", "50000")
.put("max-total-running-task-count-to-not-execute-new-query", "50000")
.put("concurrency-threshold-to-enable-resource-group-refresh", "2")
.put("resource-group-runtimeinfo-refresh-interval", "10ms")
.put("query.schedule-split-batch-size", "99")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,4 +1098,215 @@ public String getName()

return new ClusterResourceChecker(mockPolicy, config, createNodeManager());
}

// Tests that when task limit is exceeded, new queries are queued instead of starting immediately
@Test(timeOut = 10_000)
public void testTaskLimitExceededQueuesQuery()
{
RootInternalResourceGroup root = new RootInternalResourceGroup(
"root",
(group, export) -> {},
directExecutor(),
ignored -> Optional.empty(),
rg -> false,
createNodeManager(),
createClusterResourceChecker(),
QueryPacingContext.NOOP);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding a test that covers the tryAcquireAdmissionSlot failure path (coordinator overload without task-limit flag)

The production logic now treats the coordinator as overloaded if either tryAcquireAdmissionSlot() fails or taskLimitExceeded is true, but the new tests only cover the taskLimitExceeded branch. Since QueryPacingContext.NOOP always admits, the tryAcquireAdmissionSlot() failure path isn’t exercised. Please add a test-only QueryPacingContext that can return false from tryAcquireAdmissionSlot(), and verify that (1) immediate-start candidates are queued instead of started, and (2) once slots are available, processQueuedQueries() starts the queued queries, to fully validate the coordOverloaded logic.

Suggested implementation:

    private static final class TestQueryPacingContext
            implements QueryPacingContext
    {
        private final java.util.concurrent.atomic.AtomicBoolean admit;
        private final java.util.concurrent.atomic.AtomicInteger inUseSlots = new java.util.concurrent.atomic.AtomicInteger();

        private TestQueryPacingContext(boolean initialAdmit)
        {
            this.admit = new java.util.concurrent.atomic.AtomicBoolean(initialAdmit);
        }

        @Override
        public boolean tryAcquireAdmissionSlot()
        {
            if (!admit.get()) {
                return false;
            }
            inUseSlots.incrementAndGet();
            return true;
        }

        @Override
        public void releaseAdmissionSlot()
        {
            inUseSlots.decrementAndGet();
        }

        public void setAdmit(boolean value)
        {
            admit.set(value);
        }

        public int getInUseSlots()
        {
            return inUseSlots.get();
        }
    }

    @Test
    public void testAdmissionSlotFailureQueuesAndStartsQueries()
    {
        TestQueryPacingContext pacingContext = new TestQueryPacingContext(false);

        RootInternalResourceGroup root = new RootInternalResourceGroup(
                "root",
                (group, export) -> {},
                directExecutor(),
                ignored -> Optional.empty(),
                rg -> false,
                createNodeManager(),
                createClusterResourceChecker(),
                pacingContext);
        root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
        root.setMaxQueuedQueries(10);
        root.setHardConcurrencyLimit(10);

        // No task limit; overload purely due to admission slots
        root.setTaskLimitExceeded(false);

        // Immediate-start candidate should be queued because tryAcquireAdmissionSlot() fails
        MockManagedQueryExecution first = new MockManagedQueryExecution(0);
        first.startWaitingForPrerequisites();
        root.run(first);

        assertEquals(root.getQueuedQueries(), 1);
        assertFalse(first.getState().isDone());

        // Allow admissions and process queued queries
        pacingContext.setAdmit(true);
        root.processQueuedQueries();

        assertEquals(root.getQueuedQueries(), 0);
        assertTrue(first.isDone());
    }

    public void testTaskLimitExceededQueuesQuery()

To integrate this cleanly with the existing file, you’ll likely need to:

  1. Adjust the QueryPacingContext method signatures if they differ in your codebase. For example, if tryAcquireAdmissionSlot takes parameters (e.g., the resource group or query), update the overrides in TestQueryPacingContext to match, and delegate admission behavior based on those parameters as needed.
  2. Add imports if they aren’t already present at the top of TestResourceGroups.java:
    • import java.util.Optional; (likely already there)
    • import org.testng.annotations.Test; (likely already there for other tests)
    • import static org.testng.Assert.assertEquals;
    • import static org.testng.Assert.assertFalse;
    • import static org.testng.Assert.assertTrue;
    • If you prefer, replace the fully qualified java.util.concurrent.atomic.AtomicBoolean/AtomicInteger with imports for those classes.
  3. Ensure concurrency/accounting expectations:
    • If your production QueryPacingContext has additional methods (e.g., lifecycle hooks), add no-op implementations to TestQueryPacingContext.
    • If RootInternalResourceGroup requires explicit scheduling or ticking for processQueuedQueries() to be effective, ensure that this test mirrors the pattern used in other tests (e.g., calling any helper methods that drive state transitions before/after processQueuedQueries()).
  4. Placement within the class:
    • Place TestQueryPacingContext near other test helper classes in TestResourceGroups if there’s an existing convention (e.g., alongside other inner mocks).
    • Ensure the new test method is at the same level as the other @Test methods and not nested inside another method; the SEARCH/REPLACE above assumes public void testTaskLimitExceededQueuesQuery() is at class scope.

root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(10);
root.setHardConcurrencyLimit(10);

// Set task limit exceeded
root.setTaskLimitExceeded(true);

// Submit a query - it should be queued because task limit is exceeded
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
query1.startWaitingForPrerequisites();
root.run(query1);

// Query should be queued, not running
assertEquals(query1.getState(), QUEUED);
assertEquals(root.getQueuedQueries(), 1);
assertEquals(root.getRunningQueries(), 0);
}

// Tests that queued queries start when task limit is no longer exceeded
@Test(timeOut = 10_000)
public void testQueryStartsWhenTaskLimitClears()
{
RootInternalResourceGroup root = new RootInternalResourceGroup(
"root",
(group, export) -> {},
directExecutor(),
ignored -> Optional.empty(),
rg -> false,
createNodeManager(),
createClusterResourceChecker(),
QueryPacingContext.NOOP);
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(10);
root.setHardConcurrencyLimit(10);

// Set task limit exceeded
root.setTaskLimitExceeded(true);

// Submit queries - they should be queued
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
query1.startWaitingForPrerequisites();
root.run(query1);
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
query2.startWaitingForPrerequisites();
root.run(query2);

assertEquals(query1.getState(), QUEUED);
assertEquals(query2.getState(), QUEUED);
assertEquals(root.getQueuedQueries(), 2);
assertEquals(root.getRunningQueries(), 0);

// Clear task limit
root.setTaskLimitExceeded(false);

// Process queued queries - they should now start
root.processQueuedQueries();

assertEquals(query1.getState(), RUNNING);
assertEquals(query2.getState(), RUNNING);
assertEquals(root.getQueuedQueries(), 0);
assertEquals(root.getRunningQueries(), 2);
}

// Tests that queries in a subgroup hierarchy are properly queued and started when task limit changes
@Test(timeOut = 10_000)
public void testTaskLimitExceededWithSubgroups()
{
RootInternalResourceGroup root = new RootInternalResourceGroup(
"root",
(group, export) -> {},
directExecutor(),
ignored -> Optional.empty(),
rg -> false,
createNodeManager(),
createClusterResourceChecker(),
QueryPacingContext.NOOP);
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(10);
root.setHardConcurrencyLimit(10);

InternalResourceGroup groupA = root.getOrCreateSubGroup("A", true);
groupA.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
groupA.setMaxQueuedQueries(10);
groupA.setHardConcurrencyLimit(10);

InternalResourceGroup groupG = groupA.getOrCreateSubGroup("G", true);
groupG.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
groupG.setMaxQueuedQueries(10);
groupG.setHardConcurrencyLimit(10);

// Set task limit exceeded
root.setTaskLimitExceeded(true);

// Submit a query to leaf group G - it should be queued
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
query1.startWaitingForPrerequisites();
groupG.run(query1);

assertEquals(query1.getState(), QUEUED);
assertEquals(groupG.getQueuedQueries(), 1);
assertEquals(groupG.getRunningQueries(), 0);

// Clear task limit and process queued queries
root.setTaskLimitExceeded(false);
root.processQueuedQueries();

// Query should now be running
assertEquals(query1.getState(), RUNNING);
assertEquals(groupG.getQueuedQueries(), 0);
assertEquals(groupG.getRunningQueries(), 1);
}

// Tests that when task limit is exceeded, queries already running continue, but new ones are queued
@Test(timeOut = 10_000)
public void testTaskLimitExceededDoesNotAffectRunningQueries()
{
RootInternalResourceGroup root = new RootInternalResourceGroup(
"root",
(group, export) -> {},
directExecutor(),
ignored -> Optional.empty(),
rg -> false,
createNodeManager(),
createClusterResourceChecker(),
QueryPacingContext.NOOP);
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(10);
root.setHardConcurrencyLimit(10);

// Submit a query before task limit is exceeded - it should run
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
query1.startWaitingForPrerequisites();
root.run(query1);
assertEquals(query1.getState(), RUNNING);

// Now set task limit exceeded
root.setTaskLimitExceeded(true);

// Submit another query - it should be queued
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
query2.startWaitingForPrerequisites();
root.run(query2);
assertEquals(query2.getState(), QUEUED);

// The first query should still be running
assertEquals(query1.getState(), RUNNING);
assertEquals(root.getRunningQueries(), 1);
assertEquals(root.getQueuedQueries(), 1);
}

// Tests that task limit transitions work correctly with multiple cycles
@Test(timeOut = 10_000)
public void testTaskLimitExceededMultipleCycles()
{
RootInternalResourceGroup root = new RootInternalResourceGroup(
"root",
(group, export) -> {},
directExecutor(),
ignored -> Optional.empty(),
rg -> false,
createNodeManager(),
createClusterResourceChecker(),
QueryPacingContext.NOOP);
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(10);
root.setHardConcurrencyLimit(10);

// Cycle 1: Task limit exceeded, query queued
root.setTaskLimitExceeded(true);
MockManagedQueryExecution query1 = new MockManagedQueryExecution(0);
query1.startWaitingForPrerequisites();
root.run(query1);
assertEquals(query1.getState(), QUEUED);

// Clear task limit, query starts
root.setTaskLimitExceeded(false);
root.processQueuedQueries();
assertEquals(query1.getState(), RUNNING);

// Cycle 2: Task limit exceeded again, new query queued
root.setTaskLimitExceeded(true);
MockManagedQueryExecution query2 = new MockManagedQueryExecution(0);
query2.startWaitingForPrerequisites();
root.run(query2);
assertEquals(query2.getState(), QUEUED);
assertEquals(query1.getState(), RUNNING); // query1 still running

// Complete query1, processQueuedQueries should not start query2 (task limit still exceeded)
query1.complete();
root.processQueuedQueries();
assertEquals(query2.getState(), QUEUED); // Still queued because task limit exceeded

// Clear task limit, query2 starts
root.setTaskLimitExceeded(false);
root.processQueuedQueries();
assertEquals(query2.getState(), RUNNING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void testQueuingWhenTaskLimitExceeds()
{
ImmutableMap<String, String> extraProperties = ImmutableMap.<String, String>builder()
.put("experimental.spill-enabled", "false")
.put("experimental.max-total-running-task-count-to-not-execute-new-query", "2")
.put("max-total-running-task-count-to-not-execute-new-query", "2")
.build();

try (DistributedQueryRunner queryRunner = createQueryRunner(defaultSession, extraProperties)) {
Expand Down
Loading