-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25352][SQL] Perform ordered global limit when limit number is bigger than topKSortFallbackThreshold #22344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #95737 has finished for PR 22344 at commit
|
|
retest this please. |
|
Test build #95741 has finished for PR 22344 at commit
|
|
Test build #95746 has finished for PR 22344 at commit
|
|
Test build #95966 has finished for PR 22344 at commit
|
|
retest this please. |
|
Test build #95972 has finished for PR 22344 at commit
|
|
retest this please. |
|
Test build #95974 has finished for PR 22344 at commit
|
|
retest this please... |
|
Test build #95977 has finished for PR 22344 at commit
|
|
ping @cloud-fan seems to me we should consider to include this in 2.4. |
|
thanks, merging to master/2.4! |
…bigger than topKSortFallbackThreshold ## What changes were proposed in this pull request? We have optimization on global limit to evenly distribute limit rows across all partitions. This optimization doesn't work for ordered results. For a query ending with sort + limit, in most cases it is performed by `TakeOrderedAndProjectExec`. But if limit number is bigger than `SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD`, global limit will be used. At this moment, we need to do ordered global limit. ## How was this patch tested? Unit tests. Closes #22344 from viirya/SPARK-25352. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 2f42239) Signed-off-by: Wenchen Fan <[email protected]>
| if limit < conf.topKSortFallbackThreshold => | ||
| TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil | ||
| case Limit(IntegerLiteral(limit), s@Sort(order, true, child)) => | ||
| if (limit < conf.topKSortFallbackThreshold) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viirya sorry to be a little late to the party. This pattern is repeated 4x can you just move it into a helper function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell OK. I will create a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, please add space in-between s and @.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw here we really need to document what the strategies are. when there were only two cases it's not a big deal because it'd take a few seconds to understand. but this block is pretty large now that's difficult to understand. see join strategy documentation for example.
…bigger than topKSortFallbackThreshold ## What changes were proposed in this pull request? We have optimization on global limit to evenly distribute limit rows across all partitions. This optimization doesn't work for ordered results. For a query ending with sort + limit, in most cases it is performed by `TakeOrderedAndProjectExec`. But if limit number is bigger than `SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD`, global limit will be used. At this moment, we need to do ordered global limit. ## How was this patch tested? Unit tests. Closes apache#22344 from viirya/SPARK-25352. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
| */ | ||
| case class GlobalLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode { | ||
| case class GlobalLimitExec(limit: Int, child: SparkPlan, | ||
| orderedLimit: Boolean = false) extends UnaryExecNode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does orderedLimit mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means this global limit won't change input data order. This is used on sort + limit case which is usually taken by TakeOrderedAndProjectExec at most of time.
But if limit number is more than SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD, it's not goes for TakeOrderedAndProjectExec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean by "it's not goes for TakeOrderedAndProjectExec"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When limit number is more than SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD, the planner won't choose TakeOrderedAndProjectExec to perform the sort + limit operation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please document it in code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see. I will document it in the pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code needs to be documented. we won't find this pr discussion a year from now by looking at the source code, trying to figure out what it means. also the doc needs to be readable. the current doc for the config flag is unfortunately unparsable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Let me submit a pr later to address those document. Really appreciate your comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @viirya
can you write a design doc or put it in the classdoc of limit on how we handle limits? your sequence of prs are making limits much more complicated (with optimizations) and very difficult to reason about. i think we can make it easier to reason about, if we actually document the execution strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I will do it too.
|
guys - the whole sequence of prs for this feature are contributing a lot of cryptic code with arcane documentation everywhere. i worry a lot about the maintainability of the code that's coming in. can you submit a pr to improve the readability? otherwise i think we should revert all of them. please document the algorithm, the change, the config flags. it shouldn't require reading the actual implementation to understand the config flags. we as committers should also do a better job at gating unreadable code. |
|
Thanks @rxin. I will submit a pr to improve the readability. |
## What changes were proposed in this pull request? This goes to revert sequential PRs based on some discussion and comments at #16677 (comment). #22344 #22330 #22239 #16677 ## How was this patch tested? Existing tests. Closes #22481 from viirya/revert-SPARK-19355-1. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 89671a2) Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? This goes to revert sequential PRs based on some discussion and comments at #16677 (comment). #22344 #22330 #22239 #16677 ## How was this patch tested? Existing tests. Closes #22481 from viirya/revert-SPARK-19355-1. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
|
@viirya I meet the same problem when running "insert owerwrite xx select xx from xx sort by xx limit xx". which return a wrong answer, How does this pr going after revert? |

What changes were proposed in this pull request?
We have optimization on global limit to evenly distribute limit rows across all partitions. This optimization doesn't work for ordered results.
For a query ending with sort + limit, in most cases it is performed by
TakeOrderedAndProjectExec.But if limit number is bigger than
SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD, global limit will be used. At this moment, we need to do ordered global limit.How was this patch tested?
Unit tests.