-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19601] [SQL] Fix CollapseRepartition rule to preserve shuffle-enabled Repartition #16933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| // Case 1 | ||
| case Repartition(numPartitions, shuffle, Repartition(_, _, child)) => | ||
| case Repartition(numPartitions, shuffle, Repartition(_, shuffleChild, child)) | ||
| if shuffle == shuffleChild || shuffle => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, thank you for fixing this!
|
Test build #72894 has finished for PR 16933 at commit
|
| case plan => SubqueryAlias(alias, plan, None) | ||
| } | ||
|
|
||
| def coalesce(num: Integer): LogicalPlan = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it conflict with sql coalesce by having it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are used for the test cases in catalyst package, in which Dataset APIs are not available. Thus, that is why we add these DSL for test cases
| * [[RepartitionByExpression]] with the expression and last number of partition. | ||
| * 3. When a shuffle-enabled [[Repartition]] is above a [[RepartitionByExpression]], collapse as a | ||
| * single [[RepartitionByExpression]] with the expression and the last number of partition. | ||
| * 4. When a [[RepartitionByExpression]] is above a [[Repartition]], collapse as a single |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does shuffle type matter for Repartition in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RepartitionByExpression always uses ShuffleExchange. Thus, it is like Repartition with enabled shuffle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, I was referring to shuffle on Repartition, but I see your point of RepartitionByExpression overriding it regardless
| test("collapse two adjacent coalesces into one") { | ||
| val query = testRelation | ||
| .coalesce(10) | ||
| .coalesce(20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I can see the argument.
but there are 2 adjacent coalesces like this shouldn't it take the smaller number? (since coalesce can't increase partition numbers)
whereas if there are 2 adjacent repartition it could take the last number
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to respect the later input number, which is specified by users, for avoiding any surprise to users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, agreed.
| .coalesce(20) | ||
|
|
||
| val optimized2 = Optimize.execute(query2.analyze) | ||
| val correctAnswer2 = testRelation.repartition(5).coalesce(20).analyze |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that might be the plan but the end result should be numPartitions == 5 correct? is there another suite we could add tests for repartition/coalesce like this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. We can get rid of coalesce if the number of partitions is smaller than the child repartition
Actually, I can add some simple end-to-end test cases like what you did in the R side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For improving this rule, we need to clean up the resolution of RepartitionByExpression at first. See the PR #16988
|
Test build #73910 has finished for PR 16933 at commit
|
|
Test build #73911 has finished for PR 16933 at commit
|
| // Case 1: When a Repartition has a child of Repartition or RepartitionByExpression, | ||
| // we can collapse it with the child based on the type of shuffle and the specified number | ||
| // of partitions. | ||
| case r @ Repartition(_, _, child: Repartition) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can just write one case case r @ Repartition(_, _, child: RepartitionOperation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!
| */ | ||
| private def collapseRepartition(r: Repartition, child: RepartitionOperation): LogicalPlan = { | ||
| (r.shuffle, child.shuffle) match { | ||
| case (false, true) => child match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this pattern match? we can just call child.numPartitions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes!
| * - Case 2 the top [[Repartition]] enables shuffle (i.e., repartition API): | ||
| * returns the child node with the last numPartitions. | ||
| */ | ||
| private def collapseRepartition(r: Repartition, child: RepartitionOperation): LogicalPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we inline this method?
|
Test build #74045 has finished for PR 16933 at commit
|
|
Test build #74064 has finished for PR 16933 at commit
|
| .distribute('a)(20) | ||
|
|
||
| val optimized2 = Optimize.execute(query2.analyze) | ||
| val correctAnswer2 = testRelation.distribute('a)(20).analyze |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is same as correctAnswer1
| val optimized = Optimize.execute(query.analyze) | ||
| val correctAnswer = testRelation.distribute('a)(10).analyze | ||
| val optimized1 = Optimize.execute(query1.analyze) | ||
| val correctAnswer1 = testRelation.distribute('a)(10).analyze |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I not quite sure about this. Shall we optimize to relation.repartition(10)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, I just followed what we did before. After more code reading, I think we can do it, since RoundRobinPartitioning looks cheaper.
case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
ShuffleExchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}
case logical.RepartitionByExpression(expressions, child, numPartitions) =>
exchange.ShuffleExchange(HashPartitioning(
expressions, numPartitions), planLater(child)) :: NilThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is, optimization should not change the result. relation.distributeBy('a, 10).repartition(10) should have same result of relation.repartition(10), instead of relation.distributeBy('a, 10). It's not about which one is cheaper, we should not surprise users.
|
|
||
| val query2 = testRelation | ||
| .coalesce(20) | ||
| .distribute('a)(30) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to make query2 as
testRelation
.coalesce(30)
.distribute('a)(20)
i.e. the numPartitions of coalesce is bigger than distribute
| } | ||
|
|
||
| test("repartition above repartitionBy") { | ||
| val query1 = testRelation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can add a comment: // Always respects the top repartition amd removes useless distribute below repartition
| test("repartition above repartitionBy") { | ||
| val query1 = testRelation | ||
| .distribute('a)(20) | ||
| .repartition(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can still pick the same numPartition pairs: 10, 20 and 30, 20
| .distribute('a)(20) | ||
|
|
||
| val optimized2 = Optimize.execute(query2.analyze) | ||
| val correctAnswer2 = testRelation.distribute('a)(20).analyze |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's same with correctAnswer1
|
Let me re-write the whole test cases. Thanks! |
|
Test build #74137 has finished for PR 16933 at commit
|
|
LGTM, pending test |
|
Test build #74140 has finished for PR 16933 at commit
|
|
retest this please |
|
Test build #74157 has finished for PR 16933 at commit
|
|
retest this please |
|
Test build #74179 has finished for PR 16933 at commit
|
|
Thanks! Merging to master. |
|
awesome! |
What changes were proposed in this pull request?
Observed by @felixcheung in #16739, when users use the shuffle-enabled
repartitionAPI, they expect the partition they got should be the exact number they provided, even if they call shuffle-disabledcoalescelater.Currently,
CollapseRepartitionrule does not consider whether shuffle is enabled or not. Thus, we got the following unexpected result.This PR is to fix the issue. We preserve shuffle-enabled Repartition.
How was this patch tested?
Added a test case