-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29552][SQL] Execute the "OptimizeLocalShuffleReader" rule when creating new query stage and then can optimize the shuffle reader to local shuffle reader as much as possible. #26207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@cloud-fan Please help me review. Also thanks for your offline help. |
|
ok to test |
|
add to whitelist |
| @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( | ||
| ReuseAdaptiveSubquery(conf, subqueryCache), | ||
| // Here we need put the OptimizeLocalShuffleReader rule before | ||
| // ReduceNumShufflePartitions rule to avoid the further optimizaiton. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the comment needs to explain 2 things:
- why execute this rule twice
- why it must be run before
OptimizeLocalShuffleReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
|
Test build #112459 has finished for PR 26207 at commit
|
| assert(bhj.size == 3) | ||
| // additional shuffle exchange introduced, only one shuffle reader to local shuffle reader. | ||
| checkNumLocalShuffleReaders(adaptivePlan, 1) | ||
| checkNumLocalShuffleReaders(adaptivePlan, 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm, would the change make this value consistently be 2? Because the value has changed to 2 but the value was actually flaky (neither 1 or 2 consistently) depending on the situation/randomness (maybe).
You may want to run the same for what I've discovered, 1) solely in local dev, 2) test suite in local dev, 3) trigger CI for 5 times or alike.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR With this patch, the value will consistently be 2. Because we already optimize all the possible local shuffle reader. And I have run in local dev and also the test suite, the value are all 2. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK thanks for confirming.
|
I think this PR title is not accurate as this is not just fix for flaky test, right? |
|
@viirya updated the title. Thanks. |
| // optimizations should be stage-independent. | ||
| @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( | ||
| ReuseAdaptiveSubquery(conf, subqueryCache), | ||
| // We will revert the all local shuffle reader node in OptimizeLocalShuffleReader rule |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To polish it a little bit:
When adding local shuffle readers in `OptimizeLocalShuffleReader`, we revert all the local
readers if additional shuffles are introduced. This may be too conservative: maybe there is
only one local reader that introduces shuffle, and we can still keep other local readers.
Here we re-execute this rule with the sub-plan-tree of a query stage, to make sure necessary
local readers are added before executing the query stage.
This rule must be executed before `ReduceNumShufflePartitions`, as local shuffle readers can't
change number of partitions.
…e twice and before ReduceNumShufflePartitions
|
Test build #112511 has finished for PR 26207 at commit
|
|
Test build #112516 has finished for PR 26207 at commit
|
|
retest this please |
|
Test build #112517 has finished for PR 26207 at commit
|
|
thanks, merging to master! |
What changes were proposed in this pull request?
OptimizeLocalShuffleReaderrule is very conservative and gives up optimization as long as there are extra shuffles introduced. It's very likely that most of the added local shuffle readers are fine and only one introduces extra shuffle.However, it's very hard to make
OptimizeLocalShuffleReaderoptimal, a simple workaround is to run this rule again right before executing a query stage.Why are the changes needed?
Optimize more shuffle reader to local shuffle reader.
Does this PR introduce any user-facing change?
No
How was this patch tested?
existing ut