Add ability to schedule splits based on Task load, not Node load.#26030
Conversation
Reviewer's GuideThis PR introduces an optional mode to schedule splits based on per-task load instead of per-node load by adding a new configuration and session property, propagating them through NodeScheduler into SimpleNodeSelector, and implementing a new weighted selection algorithm in SimpleNodeSelector. Sequence diagram for split assignment decision based on task load vs node loadsequenceDiagram
participant "SimpleNodeSelector"
participant "RemoteTask(s)"
participant "InternalNode(s)"
participant "NodeAssignmentStats"
participant "NodeSelectionStats"
participant "Session"
"SimpleNodeSelector"->>"Session": Check scheduleSplitsBasedOnTaskLoad property
alt scheduleSplitsBasedOnTaskLoad == true and tasks == nodes
"SimpleNodeSelector"->>"RemoteTask(s)": Get task status and split weights
"SimpleNodeSelector"->>"InternalNode(s)": Map tasks to nodes
"SimpleNodeSelector"->>"NodeAssignmentStats": Get queued splits weight
"SimpleNodeSelector"->>"NodeSelectionStats": Update stats
"SimpleNodeSelector"->>"InternalNode(s)": Assign split to least busy node (by task load)
else
"SimpleNodeSelector"->>"InternalNode(s)": Get node split weights
"SimpleNodeSelector"->>"NodeAssignmentStats": Get total splits weight
"SimpleNodeSelector"->>"NodeSelectionStats": Update stats
"SimpleNodeSelector"->>"InternalNode(s)": Assign split to least busy node (by node load)
end
Class diagram for updated NodeSchedulerConfig and related propertiesclassDiagram
class NodeSchedulerConfig {
int minCandidates
boolean includeCoordinator
int maxSplitsPerNode
int maxSplitsPerTask
boolean scheduleSplitsBasedOnTaskLoad
int maxPendingSplitsPerTask
int maxUnacknowledgedSplitsPerTask
String networkTopology
int getMaxSplitsPerTask()
NodeSchedulerConfig setMaxSplitsPerTask(int)
boolean isScheduleSplitsBasedOnTaskLoad()
NodeSchedulerConfig setScheduleSplitsBasedOnTaskLoad(boolean)
}
class SystemSessionProperties {
+SCHEDULE_SPLITS_BASED_ON_TASK_LOAD : String
+isScheduleSplitsBasedOnTaskLoad(Session) : Boolean
}
NodeSchedulerConfig <.. SystemSessionProperties : uses
Class diagram for updated NodeScheduler and SimpleNodeSelectorclassDiagram
class NodeScheduler {
int minCandidates
boolean includeCoordinator
long maxSplitsWeightPerNode
long maxSplitsWeightPerTask
long maxPendingSplitsWeightPerTask
NodeTaskMap nodeTaskMap
NodeSelector createNodeSelector(Session, ConnectorId, Supplier<NodeMap>)
}
class SimpleNodeSelector {
NodeSelectionStats nodeSelectionStats
NodeTaskMap nodeTaskMap
boolean includeCoordinator
boolean scheduleSplitsBasedOnTaskLoad
AtomicReference<Supplier<NodeMap>> nodeMap
int minCandidates
long maxSplitsWeightPerNode
long maxSplitsWeightPerTask
long maxPendingSplitsWeightPerTask
int maxUnacknowledgedSplitsPerTask
int maxTasksPerStage
Optional<InternalNodeInfo> chooseLeastBusyNodeBasedOnTaskLoad(...)
Optional<InternalNodeInfo> chooseLeastBusyNode(...)
}
NodeScheduler o-- SimpleNodeSelector : creates
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java:242` </location>
<code_context>
+ protected Optional<InternalNodeInfo> chooseLeastBusyNodeBasedOnTaskLoad(SplitWeight splitWeight, List<RemoteTask> existingTasks, OptionalInt preferredNodeCount, long maxSplitsWeight, NodeAssignmentStats assignmentStats)
</code_context>
<issue_to_address>
Consider handling the case where node is null more explicitly.
If node is unexpectedly null, consider logging a warning or throwing an exception to improve debugging and visibility.
Suggested implementation:
```java
InternalNode node = nodeMap.getActiveNodesByNodeId().get(remoteTask.getNodeId());
if (node == null) {
// Log a warning for unexpected null node
log.warn("Node with ID %s is not found in active nodes. This should not happen.", remoteTask.getNodeId());
// Optionally, throw an exception to fail fast and aid debugging
// throw new IllegalStateException("Node with ID " + remoteTask.getNodeId() + " is not found in active nodes.");
continue;
}
```
1. Ensure that a suitable logger (e.g., `private static final Logger log = Logger.get(SimpleNodeSelector.class);`) is present at the top of the class. If not, add it.
2. Decide whether to log only, throw an exception, or both, depending on your desired failure mode. The code above logs a warning and continues, but you can uncomment the exception if you want to fail fast.
</issue_to_address>
### Comment 2
<location> `presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java:268` </location>
<code_context>
+ continue;
+ }
+ TaskStatus taskStatus = remoteTask.getTaskStatus();
+ long currentWeight = taskStatus.getQueuedPartitionedSplitsWeight() + taskStatus.getRunningPartitionedSplitsWeight() + assignmentStats.getQueuedSplitsWeightForStage(node);
+ boolean canAssignToNode = canAssignSplitBasedOnWeight(currentWeight, maxSplitsWeight, splitWeight);
+
</code_context>
<issue_to_address>
The calculation of currentWeight may double-count queued splits.
Ensure that the queued splits weight from assignmentStats and taskStatus are not overlapping to avoid double-counting.
</issue_to_address>
### Comment 3
<location> `presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeSchedulerConfig.java:137` </location>
<code_context>
+ }
+
+ @Config("node-scheduler.schedule-splits-based-on-task-load")
+ public NodeSchedulerConfig setScheduleSplitsBasedOnTaskLoad(boolean scheduleSplitsBasedOnTaskLoad)
+ {
+ this.scheduleSplitsBasedOnTaskLoad = scheduleSplitsBasedOnTaskLoad;
</code_context>
<issue_to_address>
Missing @ConfigDescription for scheduleSplitsBasedOnTaskLoad.
Please add the annotation to enhance documentation and make the configuration option easier to find.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| protected Optional<InternalNodeInfo> chooseLeastBusyNodeBasedOnTaskLoad(SplitWeight splitWeight, List<RemoteTask> existingTasks, OptionalInt preferredNodeCount, long maxSplitsWeight, NodeAssignmentStats assignmentStats) | ||
| { | ||
| long minWeight = Long.MAX_VALUE; | ||
| InternalNode chosenNode = null; | ||
| NodeMap nodeMap = this.nodeMap.get().get(); | ||
| for (int i = 0; i < existingTasks.size(); i++) { | ||
| RemoteTask remoteTask = existingTasks.get(i); | ||
|
|
||
| InternalNode node = nodeMap.getActiveNodesByNodeId().get(remoteTask.getNodeId()); | ||
| // TODO(spershin): This, ideally, should not happen. Should we throw instead? |
There was a problem hiding this comment.
suggestion: Consider handling the case where node is null more explicitly.
If node is unexpectedly null, consider logging a warning or throwing an exception to improve debugging and visibility.
Suggested implementation:
InternalNode node = nodeMap.getActiveNodesByNodeId().get(remoteTask.getNodeId());
if (node == null) {
// Log a warning for unexpected null node
log.warn("Node with ID %s is not found in active nodes. This should not happen.", remoteTask.getNodeId());
// Optionally, throw an exception to fail fast and aid debugging
// throw new IllegalStateException("Node with ID " + remoteTask.getNodeId() + " is not found in active nodes.");
continue;
}- Ensure that a suitable logger (e.g.,
private static final Logger log = Logger.get(SimpleNodeSelector.class);) is present at the top of the class. If not, add it. - Decide whether to log only, throw an exception, or both, depending on your desired failure mode. The code above logs a warning and continues, but you can uncomment the exception if you want to fail fast.
| continue; | ||
| } | ||
| TaskStatus taskStatus = remoteTask.getTaskStatus(); | ||
| long currentWeight = taskStatus.getQueuedPartitionedSplitsWeight() + taskStatus.getRunningPartitionedSplitsWeight() + assignmentStats.getQueuedSplitsWeightForStage(node); |
There was a problem hiding this comment.
question (bug_risk): The calculation of currentWeight may double-count queued splits.
Ensure that the queued splits weight from assignmentStats and taskStatus are not overlapping to avoid double-counting.
presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeSchedulerConfig.java
Show resolved
Hide resolved
60b528c to
e2c80ce
Compare
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull branch, local doc build, looks good. Thanks!
|
I assume that a corresponding session property does not exist? |
@steveburnett |
I suppose I do. If it existed I'd ask for it to be added to the doc, and cross-linked like in this example screenshot of a different configuration property, but if there is no session property no worries. Thanks!
|
|
@steveburnett I looked up few session props around and didn't see them in any documentation. |
Thanks!
Session property documentation is in Look at line 687 in https://github.com/prestodb/presto/blob/master/presto-docs/src/main/sphinx/admin/properties.rst for a cross-reference entry linking to properties-session.rst. The session property doc entry should also have a cross-reference link to the configuration property. Let me know if that answered your questions! |
|
Yes, that helps. Actually, seems like a lot of session props are NOT in properties-session.rst |
e2c80ce to
bb236ab
Compare
Thanks, that's appreciated!
Great point! Yes, it is normal - but it is also not the desired state of the documentation. Identifying the missing ones and documenting them, as well as any missing configuration properties, is something I want to address when possible. Until then I ask questions like these in new PRs to try to keep the doc gap from getting larger :). Thanks for your help! |
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull updated branch, new local doc build, looks good. Thanks!
presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeSchedulerConfig.java
Show resolved
Hide resolved
.../src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java
Outdated
Show resolved
Hide resolved
bb236ab to
e90fd75
Compare
.../src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java
Outdated
Show resolved
Hide resolved
99cde6f to
9651173
Compare
0484ca7 to
44f7001
Compare
rschlussel
left a comment
There was a problem hiding this comment.
small comment for the test. Otherwise looks good. Thanks!
presto-main-base/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java
Outdated
Show resolved
Hide resolved
44f7001 to
a86be1a
Compare

Description
Adding system config property and session property to enable split scheduling based on the task load, rather than the node load.
Motivation and Context
See #25906
Impact
This is particularly useful for the native worker as it runs splits for tasks differently than the java worker.
Reduces query execution time across the board (see the issue for details).
It might be a good idea if we actually considered not number of running + queued splits for each task, but just queued ones. This is because the running ones are already running and if they are slow or not would affect the number of queued splits, so we don't really care how many are running except in the rare moments when we have completed K splits, but haven't started running queued ones instead of them and are reporting the stats to coordinator. But this is a rare event and we will have some race conditions like that anyway in any case.
Test Plan
Not sure.
So far ran this in the dev cluster with shadowed real life workload.