-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-24087][SQL] Avoid shuffle when join keys are a super-set of bucket keys #21156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #89847 has finished for PR 21156 at commit
|
|
Test build #89867 has finished for PR 21156 at commit
|
|
Test build #89869 has finished for PR 21156 at commit
|
|
retest this please |
|
Test build #89875 has finished for PR 21156 at commit
|
|
Test build #91434 has finished for PR 21156 at commit
|
|
Test build #91435 has finished for PR 21156 at commit
|
|
Test build #91436 has finished for PR 21156 at commit
|
|
Test build #92669 has finished for PR 21156 at commit
|
|
I run the failed commands locally, no issue, retest again. |
|
retest this please |
|
@cloud-fan @gatorsmile @mgaido91 @viirya Could you help review this feature? |
|
Test build #92674 has finished for PR 21156 at commit
|
|
retest this please |
|
Test build #92682 has finished for PR 21156 at commit
|
| if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) => | ||
| avoidShuffleIfPossible(leftKeys, leftExpressions) | ||
|
|
||
| case _ => rightPartitioning match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC if either left or right are not HashPartitioning we are sure we won't meet the required distribution, so I guess this is useless, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. The main purpose of this feature is for the bucketed table, so the HashPartitioning is enough.
Actually, with the similar way, we can skip the shuffle for one side if it is RangePartitioning also, but I am not sure if it is really useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that case would not be covered anyway here as we are returning that we require a HashClusteredDistribution so a RangePartitioning would never match anyway, wouldn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, we can return OrderedDistribution :: OrderedDistribution :: Nil to avoid shuffle for the RangePartitioning side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we can do that, but anyway this case is useless...
|
|
||
| // Enable it after fix https://issues.apache.org/jira/browse/SPARK-12704 | ||
| ignore("avoid shuffle when join keys are a super-set of bucket keys") { | ||
| test("avoid shuffle when join keys are a super-set of bucket keys") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add more tests with different BucketSpec on the two sides?
| private def avoidShuffleIfPossible( | ||
| joinKeys: Seq[Expression], | ||
| expressions: Seq[Expression]): Seq[Distribution] = { | ||
| val indices = expressions.map(x => joinKeys.indexWhere(_.semanticEquals(x))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if here we don't find an expression? I think it would return -1 causing an error when using the index later. Can we also add a test case for this situation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case HashPartitioning(leftExpressions, _)
if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) =>
avoidShuffleIfPossible(leftKeys, leftExpressions)
if leftPartitioning.satisfies(ClusteredDistribution(leftKeys)) has ensured expressions is a subset of joinKeys, so it would not return -1, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, you're right, thanks.
|
@mgaido91 with this way, seems like we don't need |
|
Test build #92754 has finished for PR 21156 at commit
|
|
@cloud-fan For bucket table, the user will do the bucket on the primary key, so in this case, they will not have the parallelism and data skew issue and we can see good benefit from avoiding shuffle. |
|
A classic scenario could be like below:
|
|
Test build #92810 has finished for PR 21156 at commit
|
|
Test build #92809 has finished for PR 21156 at commit
|
|
IMHO, this ShuffledJoin is basically join + known distribution info. So instead of adding another join node (which doesn't map to any specific join algorithm), can we try to return the right distribution for bucket tables? |
|
Test build #92822 has finished for PR 21156 at commit
|
|
@maryannxue how about this way? Any better idea? |
|
Test build #92909 has finished for PR 21156 at commit
|
|
closed by mistake, reopen it. |
|
Test build #93601 has finished for PR 21156 at commit
|
|
What is the status now? I think this is of great value, since this gives users more possibility to leverage bucket join, all joins which take the bucket key as the prefix of join keys will benefit from this.
In this case, only table B needs extra shuffle, and shuffle keys are (b1, b2), shuffle partition number is table A's bucket number. |
|
Test build #97825 has started for PR 21156 at commit |
|
Test build #97855 has started for PR 21156 at commit |
|
Test build #97872 has started for PR 21156 at commit |
|
Sorry for the delay. I’ll take another look today.
…On Mon, Oct 22, 2018 at 7:50 AM UCB AMPLab ***@***.***> wrote:
Can one of the admins verify this patch?
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#21156 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AD-ogFX6Og9FX-cSEsJcyEvyrmIzrISgks5unb8TgaJpZM4TjmFn>
.
|
|
The idea is good. Is it possible to make it an optimization rule? Another suggestion is we need more test cases. |
| } | ||
|
|
||
| val leftPartitioning = left.outputPartitioning | ||
| val rightPartitioning = right.outputPartitioning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my biggest concern. Currently Spark adds shuffle with a rule, so we can't always get the children partitioning precisely. We implemented a similar feature in EnsureRequirements.reorderJoinPredicates, which is hacky and we should improve the framework before adding more features like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan in this PR, requiredChildDistribution is always re-calculated each time it is invoked, could it be more precise than EnsureRequirements.reorderJoinPredicates?
This kind of bucketjoin is common, do we have a plan to improve the framework in 3.0?
|
Can one of the admins verify this patch? |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
To improve the bucket join, when join keys are a super-set of bucket keys, we should avoid shuffle.
How was this patch tested?
Enable ignored test.