-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23496][CORE] Locality of coalesced partitions can be severely skewed by the order of input partitions #20664
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
mgaido91
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I commented on the JIRA I'd prefer another approach to tackle this.
Anyway, if this is not feasible, and we are going on with this approach, as of now this is a potential source for random processing times in users' workflows: ie. a user flow previously is likely to take always the same time to run. With this change, the same job can run with two very different timings. I am wondering if we can give the user some control on it, like a config property for setting the seed. What do you think?
| }.collect() | ||
| } | ||
|
|
||
| test("SPARK-23496: order of input partitions can result in severe skew in coalesce") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test looks to me as a good candidate for flakiness, since we are are picking random numbers. Can we set the seed in order to avoid this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is in fact deterministic. The seed is already fixed here:
| val rnd = new scala.util.Random(7919) // keep this class deterministic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks, sorry, I missed it
|
Thanks for the comments. I don't think the users should be impacted by changing execution time. If the parameters of the job are constant, then the partition allocation should also be deterministic, since the seed is fixed in TBH, I'm just trying to merge upstream a fix we've implemented for the client. I agree much more could be done to improve coalesce, and if someone would be interested in looking into it, I'm all for it. |
|
Test build #87632 has finished for PR 20664 at commit
|
| while (numCreated < targetLen) { | ||
| val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries) | ||
| tries += 1 | ||
| val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps add comment to explain the purpose of this change here?
| // Without the fix these would be: | ||
| // numPartsPerLocation(locations(0)) == numCoalescedPartitions - 1 | ||
| // numPartsPerLocation(locations(1)) == 1 | ||
| assert(numPartsPerLocation(locations(0)) > 0.4 * numCoalescedPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How confident are we on the assert condition to be true? How is the fraction 0.4 chosen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, the result is deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment about flakiness & fixed seed.
| .groupBy(identity) | ||
| .mapValues(_.size) | ||
|
|
||
| // Without the fix these would be: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally we don't write the comment this way, maybe just saying we want to ensure the location preferences distribute evenly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
This LGTM overall, just some nits. |
|
Test build #87672 has finished for PR 20664 at commit
|
|
retest this please |
|
Test build #87697 has finished for PR 20664 at commit
|
|
retest this please |
|
Test build #87715 has finished for PR 20664 at commit
|
|
retest this please |
|
Test build #87771 has finished for PR 20664 at commit
|
|
retest this please |
|
Test build #87827 has finished for PR 20664 at commit
|
|
retest this please |
1 similar comment
|
retest this please |
|
Test build #87954 has finished for PR 20664 at commit
|
|
Merging to master. Thanks! @mgaido91 if you feel this should be different, feel free to open a follow-up. |
…skewed by the order of input partitions ## What changes were proposed in this pull request? The algorithm in `DefaultPartitionCoalescer.setupGroups` is responsible for picking preferred locations for coalesced partitions. It analyzes the preferred locations of input partitions. It starts by trying to create one partition for each unique location in the input. However, if the the requested number of coalesced partitions is higher that the number of unique locations, it has to pick duplicate locations. Previously, the duplicate locations would be picked by iterating over the input partitions in order, and copying their preferred locations to coalesced partitions. If the input partitions were clustered by location, this could result in severe skew. With the fix, instead of iterating over the list of input partitions in order, we pick them at random. It's not perfectly balanced, but it's much better. ## How was this patch tested? Unit test reproducing the behavior was added. Author: Ala Luszczak <[email protected]> Closes apache#20664 from ala/SPARK-23496.
What changes were proposed in this pull request?
The algorithm in
DefaultPartitionCoalescer.setupGroupsis responsible for picking preferred locations for coalesced partitions. It analyzes the preferred locations of input partitions. It starts by trying to create one partition for each unique location in the input. However, if the the requested number of coalesced partitions is higher that the number of unique locations, it has to pick duplicate locations.Previously, the duplicate locations would be picked by iterating over the input partitions in order, and copying their preferred locations to coalesced partitions. If the input partitions were clustered by location, this could result in severe skew.
With the fix, instead of iterating over the list of input partitions in order, we pick them at random. It's not perfectly balanced, but it's much better.
How was this patch tested?
Unit test reproducing the behavior was added.