Support max_tasks_per_stage for scan#13477
Conversation
presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeSelector.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/execution/scheduler/SimpleNodeSelector.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Why doesn't the TopologyAwareNodeSelector respect the limit?
There was a problem hiding this comment.
I am not sure if it makes sense to use limit here. The topology awere selector selects the best nodes for splits for maximize locality. I am not sure we can even apply a limit here.
There was a problem hiding this comment.
@cemcayiroglu : Note even for SimpleNodeSelector, when the splits doesn't support remote access (e..g Raptor), the limit is not applied there. cc @highker
On the other hand, for TopologyAwareNodeSelector, the current algorithm makes best effort to co-locate data and compute nodes. Thus we should still be able to apply a limit over the candidate nodes and do best-effort topology-aware selection. Does that make sense?
There was a problem hiding this comment.
Agree, let's hide limit from those non-applicable cases.
presto-main/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java
Outdated
Show resolved
Hide resolved
d04375d to
c21b45a
Compare
There was a problem hiding this comment.
A ResettableRandomizedIterator is basically just a List of the originally data:
Should we just keep a List here? In that case we can just use NodeSelector#selectRandomNodes . @arhimondr ?
I remember this discussion happens before, let me see if I can find the previous discussion.
There was a problem hiding this comment.
@cemcayiroglu : Note even for SimpleNodeSelector, when the splits doesn't support remote access (e..g Raptor), the limit is not applied there. cc @highker
On the other hand, for TopologyAwareNodeSelector, the current algorithm makes best effort to co-locate data and compute nodes. Thus we should still be able to apply a limit over the candidate nodes and do best-effort topology-aware selection. Does that make sense?
There was a problem hiding this comment.
Instead of making limit a parameter to computeAssignments, another idea is to allow construct NodeSelector over a random set of nodes? For example, today we construct NodeSelector through:
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(connectorId);We can have a different API:
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(connectorId, maxNodeCount);I am suggesting this because today, once NodeSelector#computeAssignments is called, it memorize
randomCandidates based on limit. It makes NodeSelector stateful which is a bit difficult to reason the behavior (e.g. what about the NodeSelector get called in other place)
What do you think? @highker , @arhimondr ?
There was a problem hiding this comment.
I think @wenleix's suggestion is much better. Having limit on the interface is not intuitive to understand
There was a problem hiding this comment.
I think the direction Cem is heading in is that node selection won't be random it will be based on resources. So constructing node selector with a subset of the nodes would mean it could no longer make decisions resource based decisions.
Even today we have a TopologyAwareNodeSelector and if we passed that a random set of nodes at construction then it might be restricted as to what topology aware scheduling it can do.
There was a problem hiding this comment.
@wenleix @highker @aweisberg Thanks for the input. I agree all of you. I had a trouble to weher to place the limit. Actually, there is a bigger problem here that creates confusion. DynamicSplitPlacementPolicy should define the node selection criteria like is remote or limited etc... Today it does almost nothing. Delegates everything to nodeselector. BucketedSplitPlacementPolicy is very similar. I think nodeselector should select nodes based on the criteria provided by SplitPlacementPolicy. As @aweisberg said we are going to have different policies in the future to select nodes for memory aware scheduling. I am thinking of following what @wenleix described and hide limit. I will create a follow up PR for refactoring this parts to get ready for memory aware scheduling. Ant thoughts?
There was a problem hiding this comment.
After a second thought I will pass session to createNodeSelector instead of limit. nodeScheduler will access to limit if it is needed. Cleaner this way.
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(connectorId, session);
There was a problem hiding this comment.
The other important question is that node selection is currently happening continuously. By memoizing the selected node we are breaking the current semantic. I'm not sure how important is that. Maybe it is not.
But, if we really want to keep the semantics - in the DynamicSplitPlacementPolicy the nodes have to be selected every time. But it should also keep track of the nodes that already have some splits assigned. This list should be extended every time a new node is selected. And once the size limit is reached - the extention might be terminted, and only the already selected nodes be scheduled.
There was a problem hiding this comment.
@arhimondr this sounds great! Do you think it is better to handle the refactoring part on another PR? you are also right about the comment of the semantics. The current way can cause a problem if the initial split size is lower than the limit and the next call can be over the limit. Enough nodes will not be selected. I am going to make this change or find another way to avoid this.
There was a problem hiding this comment.
@arhimondr Actually we need to keep as is if we want to keep to semantics same. The current code does not support node addition. Nodemap is created by "createNodeSelector " and stays the same throughout the query execution. If it is ok I would like to merge it as is.
There was a problem hiding this comment.
I would like to suggest to change the second method signature to something like SplitPlacementResult computeAssignments(Set splits, List existingTasks, List selectedNodes)
upd: per offline discussion it is not possible, as it would break the topology aware node selection
With topology aware node selection nodes cannot be selected from the outside
highker
left a comment
There was a problem hiding this comment.
limit on interface may need some cleaning
There was a problem hiding this comment.
I think @wenleix's suggestion is much better. Having limit on the interface is not intuitive to understand
There was a problem hiding this comment.
Agree, let's hide limit from those non-applicable cases.
There was a problem hiding this comment.
It's not intuitive that this returns the same list if it is already constructed regardless of the parameters. Is this stuff all done exactly once? If it's done multiple times why is it guaranteed parameters like limit haven't changed?
If this needs to be the case then should it error if the parameters have changed?
There was a problem hiding this comment.
What behavior are we testing here where the same set of nodes is returned each time? When would we want to get the list of nodes multiple times deterministically?
c21b45a to
bdab68b
Compare
There was a problem hiding this comment.
I still don't like the idea with memoizing. The NodeMap right now is mutable (although with 5 second memoization). Fixing the selected nodes will change the existing semantics and cluster behaviour. Although i don't know how big a problem is that, i would still prefer to keep behaviour the same.
The List<RemoteTask> existingTasks is the list of existing tasks.
RemoteTask#getNodeId returns the assigned node id of the task. To keep the existing semantins we should probably change the algorithm to something like:
- Get all the node
- Filter out all the nodes that already have tasks
- Select random N nodes from the remaining nodes, where N = maxTasksPerStage - existingTasks.size()
- Union nodes with existing tasks ...
There was a problem hiding this comment.
The interface now is really weird.
nodeScheduler.createNodeSelector now acceps the maxTasksPerStage. And then the nodeSelector.selectRandomNodes accepts the maxTasksPerStage
I wonder if passing the limit to the NodeSelector#computeAssignments is a lesser evil?
CC: @wenleix
There was a problem hiding this comment.
I think changing NodeSelector#computeAssignmentsis a more evil cuz it will require to change TopologyAwareNodeSelector too.
There was a problem hiding this comment.
I think changing NodeSelector#computeAssignmentsis a more evil
Let me elaborate a little bit more on this:
Creating the NodeSelector with the limit (that in fact the maxTasksPerStage is) is quite confusing.
The NodeSelector has other methods, that accept the limit as parameters. Thus it is not clear what limit will be applied, say in the NodeSelector#selectRandomNodes? The maxTasksPerStage limit? Or the selectRandomNodes(limit..)? Or min() / max()?
it will require to change TopologyAwareNodeSelector too.
Currently it is simply ignored anyway. So i don't see a big difference.
Anyhow, i don't feel particularly strong about this. Feel free to leave it as is if you do.
There was a problem hiding this comment.
@arhimondr , @cemcayiroglu : If we are OK to have maxTasksPerStage when creating the NodeSelector, do we want to just pre-compute a limited set of nodes when NodeSelector is created?
Thus the only reason why need to consult nodeManager during computeAssignments is when the query is submitted at cluster initialization time, which is not a common case.
There was a problem hiding this comment.
Thus the only reason why need to consult nodeManager during computeAssignments is when the query is submitted at cluster initialization time, which is not a common case.
I'm not sure how important that in practice, but i think it is better to preserve the existing behaviour.
There was a problem hiding this comment.
Looks like this part of the code is super messy. nodescheduler and nodeselector is calling each other. no one knows who is doing what. I am creating an issue to address this. please look: https://en.wikipedia.org/wiki/Spaghetti_code
There was a problem hiding this comment.
nit: unrelated change, please revert
There was a problem hiding this comment.
nit: unrelated change, please revert
b91d528 to
84be3db
Compare
arhimondr
left a comment
There was a problem hiding this comment.
LGTM % minor comments to testing and small performance optimizations
@aweisberg @highker @wenleix Do you guys want to have another look?
There was a problem hiding this comment.
suggestion:
This code is gonna be called for every batch of splits. We can make it more efficient by
- Collecting it here to a
List - Create a set only in the less "likely" branch:
alreadySelectedNodeCount < limit, right before it has to be passed to theselectNodes
There was a problem hiding this comment.
Agreed with @arhimondr . Calling it per each computeAssignments call might be expensive.
Also, we usually have new lines for stream API call:
existingTasks.stream()
.map(remoteTask -> nodeMap.getNodesByNodeId().get(remoteTask.getNodeId()))
.collect(toSet());There was a problem hiding this comment.
I wonder if it makes sense to exercise more corner cases here:
- Select one node
- Call the second time, make sure than the second node is selected, and it is different than the first one
- Call the third time, make sure that no additional nodes have been selected.
- Add only a single node to the InmemoryNodeManager
- Call once, make sure it is selected
- Call the second time, make sure the output is the same.
- Add one more node to the InmemoryNodeManager
- Call the third time, make sure it has a new node selected
- Add one more node to the InmemoryNodeManager
- Call for the forth time, make sure the latest added node is not selected, as the limit is reached.
There was a problem hiding this comment.
@arhimondr I couldnt understand the first test case. select one node twice?
There was a problem hiding this comment.
e.g.: provide only a single split. So only a single node of three is selected at the first step.
There was a problem hiding this comment.
Agreed with @arhimondr . Calling it per each computeAssignments call might be expensive.
Also, we usually have new lines for stream API call:
existingTasks.stream()
.map(remoteTask -> nodeMap.getNodesByNodeId().get(remoteTask.getNodeId()))
.collect(toSet());There was a problem hiding this comment.
@arhimondr , @cemcayiroglu : If we are OK to have maxTasksPerStage when creating the NodeSelector, do we want to just pre-compute a limited set of nodes when NodeSelector is created?
Thus the only reason why need to consult nodeManager during computeAssignments is when the query is submitted at cluster initialization time, which is not a common case.
|
@arhimondr : This is off-topic of this PR. But here is the ancient discussion about I see your point... but anyhow, it's not a "common iterator" (the long name also suggests it 😉 ) |
|
@wenleix thanks for your feedback! We are recomputing the nodes again since the nodemap gets refreshed every 5 seconds. New worker nodes can join while we are scheduling the splits. We can cache the nodes if they reach the limit but this also makes the code more complicated and stateful. I think we can keep it as is (after adding @arhimondr suggestions. I would like to start refactoring the interfaces after this PR :) The interface names dont make sense. |
highker
left a comment
There was a problem hiding this comment.
Didn't read the test. The e2e logic/interface looks good to me
There was a problem hiding this comment.
Do we wanna do some sanity check on this value?
2a0c382 to
d659532
Compare
Correct, what I mean is we only do recomputing when we don't have enough nodes for schedule. I think @arhimondr suggests some similar ideas. But I am open to wait for the refactor/cleanup given the current interface is a bit messy already. Another idea is can we skip this recomputing when |
5f53d9d to
8b2800d
Compare
There was a problem hiding this comment.
Please use io.airlift.units.Duration instead.
See the 9th bullet point here: https://github.com/prestodb/presto/wiki/Presto-Development-Guidelines#development
There was a problem hiding this comment.
The memoizeWithExpiration supplier doesn't allow 0 =\
I recommend to have a condition here. If the duration is 0, then create a regular supplier. Otherwise the memoizeWithExpiration one.
There was a problem hiding this comment.
We prefer immutable collections whenever possible. Please use ImmutableSet.copyOf().
See the second bullet point here: https://github.com/prestodb/presto/wiki/Presto-Development-Guidelines#development
There was a problem hiding this comment.
The memoizeWithExpiration supplier uses the System.nanoTime() to check if the memoize duration has passed.
From the System.nanoTime() javadoc:
This method provides nanosecond precision, but not necessarily nanosecond resolution
It means that the System.nanoTime() may not return the updated value even if non zero number of nanoseconds have passed.
Although it is rather unlikely for the calls to take less then 1 ns, it still possible for System.nanoTime() to do not provide enough nanotime resolution at some platforms.
This behaviour can result in a flakiness of this test. Thus i recommend to have
... condition here. If the duration is 0, then create a regular supplier. Otherwise the memoizeWithExpiration one ...
8eb63ba to
8cb1bdb
Compare
There was a problem hiding this comment.
nit: new Duration(5, SECONDS)
There was a problem hiding this comment.
memoizeWithExpiration - static import
93cf92d to
448a16f
Compare
stage.max-tasks-per-stage configuration property can be used by to limit the number of tasks for scan,
448a16f to
ef787d9
Compare
|
@highker, @wenleix, @arhimondr do you all have any additional comments? |
stage.max-tasks-per-stage configuration property can be used by to limit the number of tasks for scan.