fix(scheduler): Coordinator Task Throttling Bug#27146
fix(scheduler): Coordinator Task Throttling Bug#27146feilong-liu merged 1 commit intoprestodb:masterfrom
Conversation
Reviewer's GuideAdjusts coordinator task-based throttling so new and existing resource groups become/remain eligible while preventing new query starts when task limits are exceeded, and adds tests plus config renames to cover the behavior and remove the experimental flag from the task limit property. Sequence diagram for starting next query with updated task limit checksequenceDiagram
participant Scheduler
participant InternalResourceGroup
participant RootInternalResourceGroup
Scheduler->>InternalResourceGroup: internalStartNext()
activate InternalResourceGroup
InternalResourceGroup->>RootInternalResourceGroup: isTaskLimitExceeded()
RootInternalResourceGroup-->>InternalResourceGroup: taskLimitExceeded
alt task limit exceeded
InternalResourceGroup-->>Scheduler: return false
else task limit not exceeded
InternalResourceGroup->>InternalResourceGroup: canRunMore()
alt cannot run more based on group limits
InternalResourceGroup-->>Scheduler: return false
else can run more
InternalResourceGroup->>InternalResourceGroup: select next queued query
InternalResourceGroup->>InternalResourceGroup: startInBackground(nextQuery)
InternalResourceGroup-->>Scheduler: return true
end
end
deactivate InternalResourceGroup
note over Scheduler,InternalResourceGroup: Task limit check moved from canRunMore to internalStartNext so eligibility updates still occur while preventing new query starts
Class diagram for updated resource group throttling and configclassDiagram
class InternalResourceGroup {
+run(query)
+internalStartNext() boolean
-canRunMore() boolean
-queuedQueries
-runningQueries
}
class RootInternalResourceGroup {
+isTaskLimitExceeded() boolean
}
class QueryPacingContext {
+tryAcquireAdmissionSlot() boolean
}
class QueryManagerConfig {
-maxTotalRunningTaskCountToNotExecuteNewQuery int
+getMaxTotalRunningTaskCountToNotExecuteNewQuery() int
+setMaxTotalRunningTaskCountToNotExecuteNewQuery(maxTotalRunningTaskCountToNotExecuteNewQuery int) QueryManagerConfig
}
InternalResourceGroup --> RootInternalResourceGroup : uses
InternalResourceGroup --> QueryPacingContext : uses
QueryManagerConfig ..> RootInternalResourceGroup : configures task limit
note for QueryManagerConfig "Config key renamed from experimental.max-total-running-task-count-to-not-execute-new-query to max-total-running-task-count-to-not-execute-new-query"
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- The task-limit checks are now split between
run()(viastartQuery/coordOverloaded) andinternalStartNext()(directisTaskLimitExceeded()); consider consolidating this admission logic into a single helper to avoid subtle divergence in future changes. - All the new tests in
TestResourceGroupsduplicate the sameRootInternalResourceGroupsetup boilerplate; extracting a small factory/helper for the common configuration would make the tests more readable and less error-prone to modify.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The task-limit checks are now split between `run()` (via `startQuery`/`coordOverloaded`) and `internalStartNext()` (direct `isTaskLimitExceeded()`); consider consolidating this admission logic into a single helper to avoid subtle divergence in future changes.
- All the new tests in `TestResourceGroups` duplicate the same `RootInternalResourceGroup` setup boilerplate; extracting a small factory/helper for the common configuration would make the tests more readable and less error-prone to modify.
## Individual Comments
### Comment 1
<location> `presto-main-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java:745-746` </location>
<code_context>
- if (immediateStartCandidate && queryPacingContext.tryAcquireAdmissionSlot()) {
+ boolean startQuery = immediateStartCandidate;
+ if (immediateStartCandidate) {
+ // Check for coordinator overload (task limit exceeded or denied admission)
+ boolean coordOverloaded = !queryPacingContext.tryAcquireAdmissionSlot()
+ || ((RootInternalResourceGroup) root).isTaskLimitExceeded();
+ if (coordOverloaded) {
</code_context>
<issue_to_address>
**issue (bug_risk):** Admission slot may be leaked when task limit is exceeded after a successful acquire
With this change, `coordOverloaded` is true when either `tryAcquireAdmissionSlot()` fails or `isTaskLimitExceeded()` is true. In the case where `tryAcquireAdmissionSlot()` succeeds but `isTaskLimitExceeded()` is true, `startQuery` becomes false and `startInBackground` is never called, but the acquired admission slot is never released. This leaks slots and will reduce effective admission capacity over time. Please either check `isTaskLimitExceeded()` before acquiring the slot, or explicitly release the slot when `isTaskLimitExceeded()` prevents the query from starting.
</issue_to_address>
### Comment 2
<location> `presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java:324` </location>
<code_context>
}
- @Config("experimental.max-total-running-task-count-to-not-execute-new-query")
+ @Config("max-total-running-task-count-to-not-execute-new-query")
@ConfigDescription("Keep new queries in the queue if total task count exceeds this threshold")
public QueryManagerConfig setMaxTotalRunningTaskCountToNotExecuteNewQuery(int maxTotalRunningTaskCountToNotExecuteNewQuery)
</code_context>
<issue_to_address>
**issue (bug_risk):** Renaming the config key may break existing deployments unless backward compatibility is handled
Removing the `experimental.` prefix changes the config key and will cause existing configs using the old name to be ignored unless an alias/migration is provided. If this change is intentional, ensure there’s a documented upgrade path; otherwise, consider supporting both keys (e.g., via `@LegacyConfig` or similar) so existing limits continue to apply after upgrade.
</issue_to_address>
### Comment 3
<location> `presto-main-base/src/test/java/com/facebook/presto/execution/resourceGroups/TestResourceGroups.java:1114` </location>
<code_context>
+ rg -> false,
+ createNodeManager(),
+ createClusterResourceChecker(),
+ QueryPacingContext.NOOP);
+ root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
+ root.setMaxQueuedQueries(10);
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test that covers the `tryAcquireAdmissionSlot` failure path (coordinator overload without task-limit flag)
The production logic now treats the coordinator as overloaded if either `tryAcquireAdmissionSlot()` fails or `taskLimitExceeded` is true, but the new tests only cover the `taskLimitExceeded` branch. Since `QueryPacingContext.NOOP` always admits, the `tryAcquireAdmissionSlot()` failure path isn’t exercised. Please add a test-only `QueryPacingContext` that can return `false` from `tryAcquireAdmissionSlot()`, and verify that (1) immediate-start candidates are queued instead of started, and (2) once slots are available, `processQueuedQueries()` starts the queued queries, to fully validate the `coordOverloaded` logic.
Suggested implementation:
```java
private static final class TestQueryPacingContext
implements QueryPacingContext
{
private final java.util.concurrent.atomic.AtomicBoolean admit;
private final java.util.concurrent.atomic.AtomicInteger inUseSlots = new java.util.concurrent.atomic.AtomicInteger();
private TestQueryPacingContext(boolean initialAdmit)
{
this.admit = new java.util.concurrent.atomic.AtomicBoolean(initialAdmit);
}
@Override
public boolean tryAcquireAdmissionSlot()
{
if (!admit.get()) {
return false;
}
inUseSlots.incrementAndGet();
return true;
}
@Override
public void releaseAdmissionSlot()
{
inUseSlots.decrementAndGet();
}
public void setAdmit(boolean value)
{
admit.set(value);
}
public int getInUseSlots()
{
return inUseSlots.get();
}
}
@Test
public void testAdmissionSlotFailureQueuesAndStartsQueries()
{
TestQueryPacingContext pacingContext = new TestQueryPacingContext(false);
RootInternalResourceGroup root = new RootInternalResourceGroup(
"root",
(group, export) -> {},
directExecutor(),
ignored -> Optional.empty(),
rg -> false,
createNodeManager(),
createClusterResourceChecker(),
pacingContext);
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(10);
root.setHardConcurrencyLimit(10);
// No task limit; overload purely due to admission slots
root.setTaskLimitExceeded(false);
// Immediate-start candidate should be queued because tryAcquireAdmissionSlot() fails
MockManagedQueryExecution first = new MockManagedQueryExecution(0);
first.startWaitingForPrerequisites();
root.run(first);
assertEquals(root.getQueuedQueries(), 1);
assertFalse(first.getState().isDone());
// Allow admissions and process queued queries
pacingContext.setAdmit(true);
root.processQueuedQueries();
assertEquals(root.getQueuedQueries(), 0);
assertTrue(first.isDone());
}
public void testTaskLimitExceededQueuesQuery()
```
To integrate this cleanly with the existing file, you’ll likely need to:
1. **Adjust the `QueryPacingContext` method signatures** if they differ in your codebase. For example, if `tryAcquireAdmissionSlot` takes parameters (e.g., the resource group or query), update the overrides in `TestQueryPacingContext` to match, and delegate admission behavior based on those parameters as needed.
2. **Add imports if they aren’t already present** at the top of `TestResourceGroups.java`:
- `import java.util.Optional;` (likely already there)
- `import org.testng.annotations.Test;` (likely already there for other tests)
- `import static org.testng.Assert.assertEquals;`
- `import static org.testng.Assert.assertFalse;`
- `import static org.testng.Assert.assertTrue;`
- If you prefer, replace the fully qualified `java.util.concurrent.atomic.AtomicBoolean/AtomicInteger` with imports for those classes.
3. **Ensure concurrency/accounting expectations**:
- If your production `QueryPacingContext` has additional methods (e.g., lifecycle hooks), add no-op implementations to `TestQueryPacingContext`.
- If `RootInternalResourceGroup` requires explicit scheduling or ticking for `processQueuedQueries()` to be effective, ensure that this test mirrors the pattern used in other tests (e.g., calling any helper methods that drive state transitions before/after `processQueuedQueries()`).
4. **Placement within the class**:
- Place `TestQueryPacingContext` near other test helper classes in `TestResourceGroups` if there’s an existing convention (e.g., alongside other inner mocks).
- Ensure the new test method is at the same level as the other `@Test` methods and not nested inside another method; the SEARCH/REPLACE above assumes `public void testTaskLimitExceededQueuesQuery()` is at class scope.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
...n-base/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroup.java
Outdated
Show resolved
Hide resolved
presto-main-base/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java
Show resolved
Hide resolved
| rg -> false, | ||
| createNodeManager(), | ||
| createClusterResourceChecker(), | ||
| QueryPacingContext.NOOP); |
There was a problem hiding this comment.
suggestion (testing): Consider adding a test that covers the tryAcquireAdmissionSlot failure path (coordinator overload without task-limit flag)
The production logic now treats the coordinator as overloaded if either tryAcquireAdmissionSlot() fails or taskLimitExceeded is true, but the new tests only cover the taskLimitExceeded branch. Since QueryPacingContext.NOOP always admits, the tryAcquireAdmissionSlot() failure path isn’t exercised. Please add a test-only QueryPacingContext that can return false from tryAcquireAdmissionSlot(), and verify that (1) immediate-start candidates are queued instead of started, and (2) once slots are available, processQueuedQueries() starts the queued queries, to fully validate the coordOverloaded logic.
Suggested implementation:
private static final class TestQueryPacingContext
implements QueryPacingContext
{
private final java.util.concurrent.atomic.AtomicBoolean admit;
private final java.util.concurrent.atomic.AtomicInteger inUseSlots = new java.util.concurrent.atomic.AtomicInteger();
private TestQueryPacingContext(boolean initialAdmit)
{
this.admit = new java.util.concurrent.atomic.AtomicBoolean(initialAdmit);
}
@Override
public boolean tryAcquireAdmissionSlot()
{
if (!admit.get()) {
return false;
}
inUseSlots.incrementAndGet();
return true;
}
@Override
public void releaseAdmissionSlot()
{
inUseSlots.decrementAndGet();
}
public void setAdmit(boolean value)
{
admit.set(value);
}
public int getInUseSlots()
{
return inUseSlots.get();
}
}
@Test
public void testAdmissionSlotFailureQueuesAndStartsQueries()
{
TestQueryPacingContext pacingContext = new TestQueryPacingContext(false);
RootInternalResourceGroup root = new RootInternalResourceGroup(
"root",
(group, export) -> {},
directExecutor(),
ignored -> Optional.empty(),
rg -> false,
createNodeManager(),
createClusterResourceChecker(),
pacingContext);
root.setSoftMemoryLimit(new DataSize(1, MEGABYTE));
root.setMaxQueuedQueries(10);
root.setHardConcurrencyLimit(10);
// No task limit; overload purely due to admission slots
root.setTaskLimitExceeded(false);
// Immediate-start candidate should be queued because tryAcquireAdmissionSlot() fails
MockManagedQueryExecution first = new MockManagedQueryExecution(0);
first.startWaitingForPrerequisites();
root.run(first);
assertEquals(root.getQueuedQueries(), 1);
assertFalse(first.getState().isDone());
// Allow admissions and process queued queries
pacingContext.setAdmit(true);
root.processQueuedQueries();
assertEquals(root.getQueuedQueries(), 0);
assertTrue(first.isDone());
}
public void testTaskLimitExceededQueuesQuery()To integrate this cleanly with the existing file, you’ll likely need to:
- Adjust the
QueryPacingContextmethod signatures if they differ in your codebase. For example, iftryAcquireAdmissionSlottakes parameters (e.g., the resource group or query), update the overrides inTestQueryPacingContextto match, and delegate admission behavior based on those parameters as needed. - Add imports if they aren’t already present at the top of
TestResourceGroups.java:import java.util.Optional;(likely already there)import org.testng.annotations.Test;(likely already there for other tests)import static org.testng.Assert.assertEquals;import static org.testng.Assert.assertFalse;import static org.testng.Assert.assertTrue;- If you prefer, replace the fully qualified
java.util.concurrent.atomic.AtomicBoolean/AtomicIntegerwith imports for those classes.
- Ensure concurrency/accounting expectations:
- If your production
QueryPacingContexthas additional methods (e.g., lifecycle hooks), add no-op implementations toTestQueryPacingContext. - If
RootInternalResourceGrouprequires explicit scheduling or ticking forprocessQueuedQueries()to be effective, ensure that this test mirrors the pattern used in other tests (e.g., calling any helper methods that drive state transitions before/afterprocessQueuedQueries()).
- If your production
- Placement within the class:
- Place
TestQueryPacingContextnear other test helper classes inTestResourceGroupsif there’s an existing convention (e.g., alongside other inner mocks). - Ensure the new test method is at the same level as the other
@Testmethods and not nested inside another method; the SEARCH/REPLACE above assumespublic void testTaskLimitExceededQueuesQuery()is at class scope.
- Place
|
Summary: With coordinator task based throttling (queueing) enabled, we run into an issue where certain resource groups are never updated to be eligible to run. This occurs when the resource group is created during a task throttling period and canRun returns false, resulting in the resource group never being added as an eligible subgroup on creation. When we exit task throttling, an eligibility update is never triggered. if this group doesnt have a new query added after we exit task throttling, its status is never updated. Changes: 1. move the isTaskLimitExceeded check from canRunMore to internalStartNext, canRunMore will return true allowing the group to be marked as eligible, but internalStartNext will prevent the group from running more queries. 2. add check to enqueue immediate execution candidates if task throttling 3. remove experimental from session property 4. add tests to ensure resource groups properly queue/run queries with task limits (should this be in resourceGroups or testQueryTaskLimit?) Reviewed By: spershin Differential Revision: D92632990
e158328 to
639452f
Compare
Summary: With coordinator task based throttling (queueing) enabled, we run into an issue where certain resource groups are never updated to be eligible to run. This occurs when the resource group is created during a task throttling period and canRun returns false, resulting in the resource group never being added as an eligible subgroup on creation. When we exit task throttling, an eligibility update is never triggered. if this group doesnt have a new query added after we exit task throttling, its status is never updated. Changes: 1. move the isTaskLimitExceeded check from canRunMore to internalStartNext, canRunMore will return true allowing the group to be marked as eligible, but internalStartNext will prevent the group from running more queries. 2. add check to enqueue immediate execution candidates if task throttling 3. remove experimental from session property 4. add tests to ensure resource groups properly queue/run queries with task limits (should this be in resourceGroups or testQueryTaskLimit?) Reviewed By: spershin Differential Revision: D92632990
639452f to
4ab766f
Compare
Summary: Pull Request resolved: prestodb#27146 With coordinator task based throttling (queueing) enabled, we run into an issue where certain resource groups are never updated to be eligible to run. This occurs when the resource group is created during a task throttling period and canRun returns false, resulting in the resource group never being added as an eligible subgroup on creation. When we exit task throttling, an eligibility update is never triggered. if this group doesnt have a new query added after we exit task throttling, its status is never updated. Changes: 1. move the isTaskLimitExceeded check from canRunMore to internalStartNext, canRunMore will return true allowing the group to be marked as eligible, but internalStartNext will prevent the group from running more queries. 2. add check to enqueue immediate execution candidates if task throttling 3. remove experimental from session property 4. add tests to ensure resource groups properly queue/run queries with task limits (should this be in resourceGroups or testQueryTaskLimit?) Reviewed By: spershin Differential Revision: D92632990
4ab766f to
3999410
Compare
Description
With coordinator task based throttling (queueing) enabled, we run into an issue where certain resource groups are never updated to be eligible to run. This occurs when the resource group is created during a task throttling period and canRun returns false, resulting in the resource group never being added as an eligible subgroup on creation. When we exit task throttling, an eligibility update is never triggered. if this group doesnt have a new query added after we exit task throttling, its status is never updated.
Changes:
Meta Internal review by: spershin
Meta Internal Differential Revision: D92632990
Motivation and Context
Coordinator memory is being overloaded by queries with large task counts. There needs to be safeguards on this outside of just RG's. The existing coordinator task throttling property has some issues which are fixed by this PR.
Impact
Coordinator task throttling no longer causes stuck resource groups.
Config renamed from experimental.max-total-running-task-count-to-not-execute-new-query -> max-total-running-task-count-to-not-execute-new-query, however the old config will be kept as a legacy config for backwards compatibility
Coordinator task throttling, when used in conjunction with query-pacing, should limit the number of tasks on the cluster close to the limit.
Test Plan
Bug Reproduction

Set task limit to 1 on a test cluster.
Trigger multiple queries that peak at 10-30 tasks and have execution time from 10-30 secs
repro with larger query suite

Test:

build and push again to test cluster, test previous repro
Seems to kick in as expected, cluster submits a lot of queries as running before TaskLimitExceeded fires, after which it seems to run 1-3 queries at a time for the remainder of the queue. However it seemed like the cluster was still admitting queries slowly even in a task throttling state
Following the previous fix, it was noticed that internalStartNext would not prevent immediate executions, only queued queries. This was then patched to block immediate executions during task throttling periods to prevent queries from running while in a task throttling state.

Test with second fix
The spikes in this fix are because multiple queries can be admitted with no pacing, before re-entering task throttling state. With query pacing, this aspect should be mitigated.
Contributor checklist
Summary by Sourcery
Fix coordinator task-based throttling so resource groups correctly queue and start queries when task limits are exceeded and later cleared.
Bug Fixes:
Enhancements:
Tests:
Release Notes
Please follow release notes guidelines and fill in the release notes below.