-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-17637][Scheduler]Packed scheduling for Spark tasks across executors #15541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #67157 has finished for PR 15541 at commit
|
|
@rxin @gatorsmile Can you please take a look, and kindly provide your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this?
val assignerName = conf.get(config.SPARK_SCHEDULER_TASK_ASSIGNER.key, "roundrobin")
val className = assignerMap.getOrElse(assignerName.toLowerCase, roundrobin)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put a log info or warn when the given assignerName is not correct instead of slightly turning to default one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I just read this function, my first question is how we can ensure this will not be out of boundary? We need to leave a comment to explain this. Or add a safety check for avoiding any bug we could add in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will change to make it similar to Iterator.next() method and add comments with similar comments to the Iterator.next()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two space between private and val
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can remove line 109 after following the change:
assigner.withCpuPerTask(CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the current offer is not assigned, why we need to step to next offer if coresAvailable is still enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two cases :
a) There is no (or insufficient) locality information - in which case, what you describe will hold.
All subsequent requests will also not result in assignment.
b) If there are other executors for which sufficient locality affinity holds, then a 'later' executor in the iteration order can satisfy the locality preference.
The assignment is decided by TaskSetManager eventually - the Assigner is simply specifying the order in which iteration proceeds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the current offer is rejected, it is not valid for the current taskset, (probably due to locality restriction). Each scheduling algorithm has to respect the locality restriction, and in the meantime provide next available offer to the taskset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you will put offers into the PriorityQueue, is it still necessary to do shuffling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments from @mridulm in last PR. I think it is reasonable, but don't have concrete answer in my mind.
"Would be good to shuffle workOffset's for this class too.
Practically, this ensures that initial heap will be randomized when cores are the same. "
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sounds correct. However, I don't think it has real effect. Once the cores are the same, meaning no task gets assigned in previous run. So it doesn't matter if we begin with different order of offers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After talking to several other people, they don't feel the shuffle is strongly needed. @mridulm If you don't mind, I will remove it in my next patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhzhan Can you elaborate what the concern with shuffle'ing are ?
There were various reasons why we started shuffling offers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm I am not sure how much the shuffle can impact the scheduling, and thus don't have strong opinion on this.
@viirya Even if the cores are the same, it does not mean that "no task gets assigned in previous run". Shuffling does take effect here. For example, the previous round may be (5, 4, 3), and one core is allocated, then the current round would be (4, ,4 3).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhzhan Let is preserve the behavior - for any application using this assigner, all tasksets will be executed based on the ordering of offers (both with and without good locality info).
The impact can be fairly non trivial - which is why shuffle'ing was initially added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use val and call clear in init below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As pointed above, call clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this assigner will try to pack as more as possible tasks into the same worker, the concern would be the increasing memory pressure on the worker. Do you have experienced such issue in your practical usage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your concern is valid. Each scheduling algorithms has its pros and cons, and which one is chosen depends on user's requirement. We mainly want to use this to save reserved resources combined with dynamic allocation. In our pipeline, we didn't observe the problem. If it happens, we need to investigate the memory allocation part to see whether it has problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need reset()? Looks like we only need init(). As we will call init before each assignment, it should be complete in resetting the status to initial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that if we do not use reset, the assigner has to keep internal resources until next time, but it is not big overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you don't have big object which posts serious concern on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"this worker" ? this wont represent a worker here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: requested => requests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Tracking => Tracks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: different the locality restrictions => different locality restrictions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to put these into separate points and not as a paragraph. Also, I am not sure what the protocol is about putting details like method names in the doc. As things stand, it will serve good for people trying to read the code but as the codebase evolves, things might get out of sync if this comment is not updated.
docs/configuration.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: create a list for each policy and explain inline instead of saying former, latter below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra space in private val
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after while
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't you add it to the heap any ways despite of what assigned is set to ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is rejected, it is not valid for this round of assignment for this specific task set anymore. Because it means it is not valid for all tasks in the task set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as before. Shouldn't you add it to the heap any ways despite of what assigned is set to ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the reason above, if the offer is rejected, we have to move forward
docs/configuration.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: packed and balanced are provided.
|
Test build #67224 has finished for PR 15541 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like the shuffling reason of BalancedAssigner can be applied here too? If shuffling, this ensures that initial sorted offers will be randomized when cores are the same, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya Agree, good point.
|
Test build #67290 has finished for PR 15541 at commit
|
|
@rxin Can you please take a look, and let me know if you have any concern? |
|
Accidentally, I deleted all my comments. You might need to check the emails to find all my comments. :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this class private to scheduler?
|
@gatorsmile I didn't see your new comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perform -> performs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
invoke -> invokes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@return is not aligned with the line above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether -> whether
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It -> In
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remembered that in your original PR, there is a resource release method. Do you still need it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the review comments, we do not need it anymore. The resource will be released in the init method.
docs/configuration.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missed space between . and The
|
Test build #67395 has finished for PR 15541 at commit
|
|
Test build #67428 has finished for PR 15541 at commit
|
|
@rxin Would you like to take a look and let you know if you have any concern? Thanks. |
|
retest this please |
|
Sure will take a look in the next couple of days to get this into 2.1 if possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between this and packed? Wouldn't they look similar? Why would anyone use one over another?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin These two do the opposite thing. The packed scheduler tries to schedule tasks to workers as few as possible so that some workers without task running can be released. The balanced assigner tries to schedule the tasks to workers with the least work load.
If user wants optimal resource reservation, they may want the packer assigner. If user observe some memory pressure, they may want to try the balanced assigner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I made a mistake -- I meant to ask the difference between balance and round robin. Isn't the two similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two assigner may behave similar in practice. The difference is that the balanced assigner tries to distribute the work load more aggressively.
|
Test build #67863 has finished for PR 15541 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd actually fail Spark if it cannot be constructed -- otherwise it is easier to make mistakes.
|
To be honest, I find the current API pretty weird (it is some stateful object that has to be reset every time). I suspect you designed this API by just abstracting out the logic you wanted to change from the existing implementation, but that doesn't necessarily lead to intuitive apis. It's been a while since I last checked the scheduler code, so it'd take me a while to page back in. |
docs/configuration.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I suggest double quote the keywords "roundrobin", "packed", and "balanced" in this paragraph. E.g. the "balanced" task assigner sounds better to me than the balanced task assigner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comments of the resourceOffers method shoud be updated. It still says We fill each node with tasks in a round-robin manner so that tasks are balanced across the cluster.
|
@rxin Thanks for the feedback regarding the TaskAssigner API. The current API is designed based on the current logic of TaskSchedulerImp, where the scheduler takes many rounds to assign the tasks for each task set. I have not figured out a better way yet. Any suggestions are welcome. |
|
Test build #67924 has finished for PR 15541 at commit
|
|
Test build #69015 has finished for PR 15541 at commit
|
|
Test build #72506 has finished for PR 15541 at commit
|
|
Test build #97708 has started for PR 15541 at commit |
|
Test build #97724 has started for PR 15541 at commit |
|
Test build #97780 has started for PR 15541 at commit |
|
Build finished. Test FAILed. |
What changes were proposed in this pull request?
Restructure the code and implement two new task assigner.
PackedAssigner: try to allocate tasks to the executors with least available cores, so that spark can release reserved executors when dynamic allocation is enabled.
BalancedAssigner: try to allocate tasks to the executors with more available cores in order to balance the workload across all executors.
By default, the original round robin assigner is used.
We test a pipeline, and new PackedAssigner save around 45% regarding the reserved cpu and memory with dynamic allocation enabled.
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Both unit test in TaskSchedulerImplSuite and manual tests in production pipeline.