Skip to content

Commit 5235604

Browse files
committed
Preserve hash join (BHJ and SHJ) stream side ordering
1 parent 4da93b0 commit 5235604

File tree

2 files changed

+40
-0
lines changed

2 files changed

+40
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ trait HashJoin extends BaseJoinExec {
5454

5555
override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning
5656

57+
override def outputOrdering: Seq[SortOrder] = streamedPlan.outputOrdering
58+
5759
protected lazy val (buildPlan, streamedPlan) = buildSide match {
5860
case BuildLeft => (left, right)
5961
case BuildRight => (right, left)

sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,4 +1104,42 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
11041104
})
11051105
}
11061106
}
1107+
1108+
test("SPARK-32383: Preserve hash join (BHJ and SHJ) stream side ordering") {
1109+
val df1 = spark.range(100).select($"id".as("k1"))
1110+
val df2 = spark.range(100).select($"id".as("k2"))
1111+
val df3 = spark.range(3).select($"id".as("k3"))
1112+
val df4 = spark.range(100).select($"id".as("k4"))
1113+
1114+
// Test broadcast hash join
1115+
withSQLConf(
1116+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") {
1117+
val plan = df1.join(df2, $"k1" === $"k2")
1118+
.join(df3, $"k1" === $"k3")
1119+
.join(df4, $"k1" === $"k4")
1120+
.queryExecution
1121+
.executedPlan
1122+
assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2)
1123+
assert(plan.collect { case _: BroadcastHashJoinExec => true }.size === 1)
1124+
// No extra sort before last sort merge join
1125+
assert(plan.collect { case _: SortExec => true }.size === 3)
1126+
}
1127+
1128+
// Test shuffled hash join
1129+
withSQLConf(
1130+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
1131+
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
1132+
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
1133+
val df3 = spark.range(10).select($"id".as("k3"))
1134+
val plan = df1.join(df2, $"k1" === $"k2")
1135+
.join(df3, $"k1" === $"k3")
1136+
.join(df4, $"k1" === $"k4")
1137+
.queryExecution
1138+
.executedPlan
1139+
assert(plan.collect { case _: SortMergeJoinExec => true }.size === 2)
1140+
assert(plan.collect { case _: ShuffledHashJoinExec => true }.size === 1)
1141+
// No extra sort before last sort merge join
1142+
assert(plan.collect { case _: SortExec => true }.size === 3)
1143+
}
1144+
}
11071145
}

0 commit comments

Comments
 (0)