Skip to content

Commit 01cf8a4

Browse files
c21cloud-fan
authored andcommitted
[SPARK-32383][SQL] Preserve hash join (BHJ and SHJ) stream side ordering
### What changes were proposed in this pull request? Currently `BroadcastHashJoinExec` and `ShuffledHashJoinExec` do not preserve children output ordering information (inherit from `SparkPlan.outputOrdering`, which is Nil). This can add unnecessary sort in complex queries involved multiple joins. Example: ``` withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") { val df1 = spark.range(100).select($"id".as("k1")) val df2 = spark.range(100).select($"id".as("k2")) val df3 = spark.range(3).select($"id".as("k3")) val df4 = spark.range(100).select($"id".as("k4")) val plan = df1.join(df2, $"k1" === $"k2") .join(df3, $"k1" === $"k3") .join(df4, $"k1" === $"k4") .queryExecution .executedPlan } ``` Current physical plan (extra sort on `k1` before top sort merge join): ``` *(9) SortMergeJoin [k1#220L], [k4#232L], Inner :- *(6) Sort [k1#220L ASC NULLS FIRST], false, 0 : +- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight : :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner : : :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(k1#220L, 5), true, [id=#128] : : : +- *(1) Project [id#218L AS k1#220L] : : : +- *(1) Range (0, 100, step=1, splits=2) : : +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(k2#224L, 5), true, [id=#134] : : +- *(3) Project [id#222L AS k2#224L] : : +- *(3) Range (0, 100, step=1, splits=2) : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#141] : +- *(5) Project [id#226L AS k3#228L] : +- *(5) Range (0, 3, step=1, splits=2) +- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(k4#232L, 5), true, [id=#148] +- *(7) Project [id#230L AS k4#232L] +- *(7) Range (0, 100, step=1, splits=2) ``` Ideal physical plan (no extra sort on `k1` before top sort merge join): ``` *(9) SortMergeJoin [k1#220L], [k4#232L], Inner :- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight : :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner : : :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0 : : : +- Exchange hashpartitioning(k1#220L, 5), true, [id=#127] : : : +- *(1) Project [id#218L AS k1#220L] : : : +- *(1) Range (0, 100, step=1, splits=2) : : +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0 : : +- Exchange hashpartitioning(k2#224L, 5), true, [id=#133] : : +- *(3) Project [id#222L AS k2#224L] : : +- *(3) Range (0, 100, step=1, splits=2) : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#140] : +- *(5) Project [id#226L AS k3#228L] : +- *(5) Range (0, 3, step=1, splits=2) +- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(k4#232L, 5), true, [id=#146] +- *(7) Project [id#230L AS k4#232L] +- *(7) Range (0, 100, step=1, splits=2) ``` ### Why are the changes needed? To avoid unnecessary sort in query, and it has most impact when users read sorted bucketed table. Though the unnecessary sort is operating on already sorted data, it would have obvious negative impact on IO and query run time if the data is large and external sorting happens. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `JoinSuite`. Closes #29181 from c21/ordering. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 13c64c2 commit 01cf8a4

File tree

2 files changed

+78
-1
lines changed

2 files changed

+78
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,41 @@ trait HashJoin extends BaseJoinExec {
5252
}
5353
}
5454

55-
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
55+
override def outputPartitioning: Partitioning = buildSide match {
56+
case BuildLeft =>
57+
joinType match {
58+
case _: InnerLike | RightOuter => right.outputPartitioning
59+
case x =>
60+
throw new IllegalArgumentException(
61+
s"HashJoin should not take $x as the JoinType with building left side")
62+
}
63+
case BuildRight =>
64+
joinType match {
65+
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin =>
66+
left.outputPartitioning
67+
case x =>
68+
throw new IllegalArgumentException(
69+
s"HashJoin should not take $x as the JoinType with building right side")
70+
}
71+
}
72+
73+
override def outputOrdering: Seq[SortOrder] = buildSide match {
74+
case BuildLeft =>
75+
joinType match {
76+
case _: InnerLike | RightOuter => right.outputOrdering
77+
case x =>
78+
throw new IllegalArgumentException(
79+
s"HashJoin should not take $x as the JoinType with building left side")
80+
}
81+
case BuildRight =>
82+
joinType match {
83+
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin =>
84+
left.outputOrdering
85+
case x =>
86+
throw new IllegalArgumentException(
87+
s"HashJoin should not take $x as the JoinType with building right side")
88+
}
89+
}
5690

5791
protected lazy val (buildPlan, streamedPlan) = buildSide match {
5892
case BuildLeft => (left, right)

sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,4 +1104,47 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
11041104
})
11051105
}
11061106
}
1107+
1108+
test("SPARK-32383: Preserve hash join (BHJ and SHJ) stream side ordering") {
1109+
val df1 = spark.range(100).select($"id".as("k1"))
1110+
val df2 = spark.range(100).select($"id".as("k2"))
1111+
val df3 = spark.range(3).select($"id".as("k3"))
1112+
val df4 = spark.range(100).select($"id".as("k4"))
1113+
1114+
// Test broadcast hash join
1115+
withSQLConf(
1116+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") {
1117+
Seq("inner", "left_outer").foreach(joinType => {
1118+
val plan = df1.join(df2, $"k1" === $"k2", joinType)
1119+
.join(df3, $"k1" === $"k3", joinType)
1120+
.join(df4, $"k1" === $"k4", joinType)
1121+
.queryExecution
1122+
.executedPlan
1123+
assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2)
1124+
assert(plan.collect { case _: BroadcastHashJoinExec => true }.size === 1)
1125+
// No extra sort before last sort merge join
1126+
assert(plan.collect { case _: SortExec => true }.size === 3)
1127+
})
1128+
}
1129+
1130+
// Test shuffled hash join
1131+
withSQLConf(
1132+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
1133+
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
1134+
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
1135+
val df3 = spark.range(10).select($"id".as("k3"))
1136+
1137+
Seq("inner", "left_outer").foreach(joinType => {
1138+
val plan = df1.join(df2, $"k1" === $"k2", joinType)
1139+
.join(df3, $"k1" === $"k3", joinType)
1140+
.join(df4, $"k1" === $"k4", joinType)
1141+
.queryExecution
1142+
.executedPlan
1143+
assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2)
1144+
assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
1145+
// No extra sort before last sort merge join
1146+
assert(plan.collect { case _: SortExec => true }.size === 3)
1147+
})
1148+
}
1149+
}
11071150
}

0 commit comments

Comments
 (0)