-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-19122][SQL] Unnecessary shuffle+sort added if join predicates ordering differ from bucketing and sorting order #16985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
217a843
d11fe32
060730c
213c273
3f27ff2
c36cd1a
acd4dcb
b295093
7171b58
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,94 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.joins | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.Expression | ||
| import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.execution.SparkPlan | ||
|
|
||
| /** | ||
| * When the physical operators are created for JOIN, the ordering of join keys is based on order | ||
| * in which the join keys appear in the user query. That might not match with the output | ||
| * partitioning of the join node's children (thus leading to extra sort / shuffle being | ||
| * introduced). This rule will change the ordering of the join keys to match with the | ||
| * partitioning of the join nodes' children. | ||
| */ | ||
| class ReorderJoinPredicates extends Rule[SparkPlan] { | ||
| private def reorderJoinKeys( | ||
| leftKeys: Seq[Expression], | ||
| rightKeys: Seq[Expression], | ||
| leftPartitioning: Partitioning, | ||
| rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) = { | ||
|
|
||
| def reorder( | ||
| expectedOrderOfKeys: Seq[Expression], | ||
| currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression]) = { | ||
| val leftKeysBuffer = ArrayBuffer[Expression]() | ||
| val rightKeysBuffer = ArrayBuffer[Expression]() | ||
|
|
||
| expectedOrderOfKeys.foreach(expression => { | ||
| val index = currentOrderOfKeys.indexWhere(e => e.semanticEquals(expression)) | ||
| leftKeysBuffer.append(leftKeys(index)) | ||
| rightKeysBuffer.append(rightKeys(index)) | ||
| }) | ||
| (leftKeysBuffer, rightKeysBuffer) | ||
| } | ||
|
|
||
| if (leftKeys.forall(_.deterministic) && rightKeys.forall(_.deterministic)) { | ||
| leftPartitioning match { | ||
| case HashPartitioning(leftExpressions, _) | ||
| if leftExpressions.length == leftKeys.length && | ||
| leftKeys.forall(x => leftExpressions.exists(_.semanticEquals(x))) => | ||
| reorder(leftExpressions, leftKeys) | ||
|
|
||
| case _ => rightPartitioning match { | ||
| case HashPartitioning(rightExpressions, _) | ||
| if rightExpressions.length == rightKeys.length && | ||
| rightKeys.forall(x => rightExpressions.exists(_.semanticEquals(x))) => | ||
| reorder(rightExpressions, rightKeys) | ||
|
|
||
| case _ => (leftKeys, rightKeys) | ||
| } | ||
| } | ||
| } else { | ||
| (leftKeys, rightKeys) | ||
| } | ||
| } | ||
|
|
||
| def apply(plan: SparkPlan): SparkPlan = plan.transformUp { | ||
| case BroadcastHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, left, right) => | ||
| val (reorderedLeftKeys, reorderedRightKeys) = | ||
| reorderJoinKeys(leftKeys, rightKeys, left.outputPartitioning, right.outputPartitioning) | ||
| BroadcastHashJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, buildSide, condition, | ||
| left, right) | ||
|
|
||
| case ShuffledHashJoinExec(leftKeys, rightKeys, joinType, buildSide, condition, left, right) => | ||
| val (reorderedLeftKeys, reorderedRightKeys) = | ||
| reorderJoinKeys(leftKeys, rightKeys, left.outputPartitioning, right.outputPartitioning) | ||
| ShuffledHashJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, buildSide, condition, | ||
| left, right) | ||
|
|
||
| case SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, left, right) => | ||
| val (reorderedLeftKeys, reorderedRightKeys) = | ||
| reorderJoinKeys(leftKeys, rightKeys, left.outputPartitioning, right.outputPartitioning) | ||
| SortMergeJoinExec(reorderedLeftKeys, reorderedRightKeys, joinType, condition, left, right) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -254,12 +254,14 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { | |
| bucketedTableTestSpecLeft: BucketedTableTestSpec, | ||
| bucketedTableTestSpecRight: BucketedTableTestSpec, | ||
| joinType: String = "inner", | ||
| joinCondition: (DataFrame, DataFrame) => Column): Unit = { | ||
| joinCondition: (DataFrame, DataFrame) => Column, | ||
| expectedResult: Option[Array[Row]] = None): Array[Row] = { | ||
| val BucketedTableTestSpec(bucketSpecLeft, numPartitionsLeft, shuffleLeft, sortLeft) = | ||
| bucketedTableTestSpecLeft | ||
| val BucketedTableTestSpec(bucketSpecRight, numPartitionsRight, shuffleRight, sortRight) = | ||
| bucketedTableTestSpecRight | ||
|
|
||
| var result: Array[Row] = Array.empty | ||
| withTable("bucketed_table1", "bucketed_table2") { | ||
| def withBucket( | ||
| writer: DataFrameWriter[Row], | ||
|
|
@@ -315,8 +317,14 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { | |
| assert( | ||
| joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == sortRight, | ||
| s"expected sort in the right child to be $sortRight but found\n${joinOperator.right}") | ||
|
|
||
| if (expectedResult.isDefined) { | ||
| checkAnswer(joined, expectedResult.get) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In L296 we already have an
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will let us validate against some predefined expected output. |
||
| } | ||
| result = joined.collect() | ||
| } | ||
| } | ||
| result | ||
| } | ||
|
|
||
| private def joinCondition(joinCols: Seq[String]) (left: DataFrame, right: DataFrame): Column = { | ||
|
|
@@ -543,6 +551,68 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { | |
| ) | ||
| } | ||
|
|
||
| test("SPARK-19122 Re-order join predicates if they match with the child's output partitioning") { | ||
| val bucketedTableTestSpec = BucketedTableTestSpec( | ||
| Some(BucketSpec(8, Seq("i", "j", "k"), Seq("i", "j", "k"))), | ||
| numPartitions = 1, | ||
| expectedShuffle = false, | ||
| expectedSort = false) | ||
|
|
||
| def testBucketingWithPredicate( | ||
| joinCondition: (DataFrame, DataFrame) => Column, | ||
| expectedResult: Option[Array[Row]]): Array[Row] = { | ||
| testBucketing( | ||
| bucketedTableTestSpecLeft = bucketedTableTestSpec, | ||
| bucketedTableTestSpecRight = bucketedTableTestSpec, | ||
| joinCondition = joinCondition, | ||
| expectedResult = expectedResult | ||
| ) | ||
| } | ||
|
|
||
| // Irrespective of the ordering of keys in the join predicate, the query plan and | ||
| // query results should always be the same | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we don't need to test this here, this is an existing property of join implementation in Spark SQL, and should have already been well tested. Then we don't need to change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed validation of query result |
||
| val result = Some(testBucketingWithPredicate(joinCondition(Seq("i", "j", "k")), None)) | ||
|
|
||
| testBucketingWithPredicate(joinCondition(Seq("i", "k", "j")), result) | ||
| testBucketingWithPredicate(joinCondition(Seq("j", "k", "i")), result) | ||
| testBucketingWithPredicate(joinCondition(Seq("j", "i", "k")), result) | ||
| testBucketingWithPredicate(joinCondition(Seq("k", "i", "j")), result) | ||
| testBucketingWithPredicate(joinCondition(Seq("k", "j", "i")), result) | ||
| } | ||
|
|
||
| test("SPARK-19122 No re-ordering should happen if set of join columns != set of child's " + | ||
| "partitioning columns") { | ||
|
|
||
| // join predicates is a super set of child's partitioning columns | ||
| val bucketedTableTestSpec1 = | ||
| BucketedTableTestSpec(Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), numPartitions = 1) | ||
| testBucketing( | ||
| bucketedTableTestSpecLeft = bucketedTableTestSpec1, | ||
| bucketedTableTestSpecRight = bucketedTableTestSpec1, | ||
| joinCondition = joinCondition(Seq("i", "j", "k")) | ||
| ) | ||
|
|
||
| // child's partitioning columns is a super set of join predicates | ||
| val bucketedTableTestSpec2 = | ||
| BucketedTableTestSpec(Some(BucketSpec(8, Seq("i", "j", "k"), Seq("i", "j", "k"))), | ||
| numPartitions = 1) | ||
| testBucketing( | ||
| bucketedTableTestSpecLeft = bucketedTableTestSpec2, | ||
| bucketedTableTestSpecRight = bucketedTableTestSpec2, | ||
| joinCondition = joinCondition(Seq("i", "j")) | ||
| ) | ||
|
|
||
| // set of child's partitioning columns != set join predicates (despite the lengths of the | ||
| // sets are same) | ||
| val bucketedTableTestSpec3 = | ||
| BucketedTableTestSpec(Some(BucketSpec(8, Seq("i", "j"), Seq("i", "j"))), numPartitions = 1) | ||
| testBucketing( | ||
| bucketedTableTestSpecLeft = bucketedTableTestSpec3, | ||
| bucketedTableTestSpecRight = bucketedTableTestSpec3, | ||
| joinCondition = joinCondition(Seq("j", "k")) | ||
| ) | ||
| } | ||
|
|
||
| test("error if there exists any malformed bucket files") { | ||
| withTable("bucketed_table") { | ||
| df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need the same length? Let's say the child partitioning is
a, b, c, dand the join key isb, a, we can reorder the join key to avoid shuffle, right?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that would be right thing to do. If child is partitioned on
a, b, c, d, its basically means rows are distributed over hash ofa, b, c, d. Lets say we have two rows with values ofa, b, c, das:If the join key
b,ais reordered asa,band we want to avoid shuffle, that would mean that we expect the child to have same values ofa,bin the same partition. But if you look at row1 and row2 above, even if values ofaandbare the same, there is no guarantee that they would belong to the same partition... as the partition is based on hash of alla,b,c,d.If the join keys are a subset of the partitioning, then there needs to be a shuffle to be done. There is only one exception to this (more of a corner case) : https://issues.apache.org/jira/browse/SPARK-18067
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh sorry I made a mistake.
if the child partitioning is
a, band the join key isb, a, c, d, does it make sense to reorder it asa, b ,c ,d?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EnsureRequirementswould still add a shuffle in either case even if we reorder.JOIN would expect data to be distributed over
b, a, c, d(ora,b,c,dif you reorder) which maps to HashPartitioning(a,b,c,d) :spark/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
Line 185 in e9c91ba
But the child nodes won't have matching partitioning ie. they will have HashPartitioning(
a,b).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract for reordering is that the set of join keys must be equal to the set of child's partitioning columns (implemented at L58-L59 in this file). Thus there won't be reordering for the case you pointed out. I have added a test case of the same.