Skip to content

Commit e9e5f39

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-4174] Backport Hash join PR / Merge pull request #1054 from carmel/hash_join
Backport Hash join PR
2 parents 97b0ece + 590dc6b commit e9e5f39

File tree

12 files changed

+307
-77
lines changed

12 files changed

+307
-77
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/BroadcastJoinOuterJoinStreamSide.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ object BroadcastJoinOuterJoinStreamSide extends Rule[LogicalPlan] with JoinSelec
3636
LeftOuter | LeftSemi | LeftAnti, _, _) =>
3737
j
3838
case j @ ExtractEquiJoinKeys(LeftOuter | LeftSemi | LeftAnti,
39-
leftKeys, _, None, left, right, hint) if leftKeys.nonEmpty && muchSmaller(left, right) &&
39+
leftKeys, _, None, left, right, hint)
40+
if leftKeys.nonEmpty && muchSmaller(left, right, conf) &&
4041
!(hintToBroadcastRight(hint) || canBroadcastBySize(right, conf)) &&
4142
(hintToBroadcastLeft(hint) || canBroadcastBySize(left, conf)) =>
4243
logInfo("BroadcastJoinOuterJoinStreamSide detected.")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.rules._
2828
import org.apache.spark.sql.internal.SQLConf
29+
import org.apache.spark.util.Utils
2930

3031
/**
3132
* Reorder the joins and push all the conditions into join, so that the bottom ones have at least
@@ -270,12 +271,18 @@ trait JoinSelectionHelper {
270271
val buildLeft = if (hintOnly) {
271272
hintToShuffleHashJoinLeft(hint)
272273
} else {
273-
canBuildLocalHashMapBySize(left, conf) && muchSmaller(left, right)
274+
hintToPreferShuffleHashJoinLeft(hint) ||
275+
(!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(left, conf) &&
276+
muchSmaller(left, right, conf)) ||
277+
forceApplyShuffledHashJoin(conf)
274278
}
275279
val buildRight = if (hintOnly) {
276280
hintToShuffleHashJoinRight(hint)
277281
} else {
278-
canBuildLocalHashMapBySize(right, conf) && muchSmaller(right, left)
282+
hintToPreferShuffleHashJoinRight(hint) ||
283+
(!conf.preferSortMergeJoin && canBuildLocalHashMapBySize(right, conf) &&
284+
muchSmaller(right, left, conf)) ||
285+
forceApplyShuffledHashJoin(conf)
279286
}
280287
getBuildSide(
281288
canBuildShuffledHashJoinLeft(joinType) && buildLeft,
@@ -366,6 +373,14 @@ trait JoinSelectionHelper {
366373
hint.rightHint.exists(_.strategy.contains(SHUFFLE_HASH))
367374
}
368375

376+
def hintToPreferShuffleHashJoinLeft(hint: JoinHint): Boolean = {
377+
hint.leftHint.exists(_.strategy.contains(PREFER_SHUFFLE_HASH))
378+
}
379+
380+
def hintToPreferShuffleHashJoinRight(hint: JoinHint): Boolean = {
381+
hint.rightHint.exists(_.strategy.contains(PREFER_SHUFFLE_HASH))
382+
}
383+
369384
def hintToSortMergeJoin(hint: JoinHint): Boolean = {
370385
hint.leftHint.exists(_.strategy.contains(SHUFFLE_MERGE)) ||
371386
hint.rightHint.exists(_.strategy.contains(SHUFFLE_MERGE))
@@ -405,14 +420,15 @@ trait JoinSelectionHelper {
405420
}
406421

407422
/**
408-
* Returns whether plan a is much smaller (3X) than plan b.
423+
* Returns true if the data size of plan a multiplied by SHUFFLE_HASH_JOIN_FACTOR
424+
* is smaller than plan b.
409425
*
410426
* The cost to build hash map is higher than sorting, we should only build hash map on a table
411427
* that is much smaller than other one. Since we does not have the statistic for number of rows,
412428
* use the size of bytes here as estimation.
413429
*/
414-
def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
415-
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
430+
def muchSmaller(a: LogicalPlan, b: LogicalPlan, conf: SQLConf): Boolean = {
431+
a.stats.sizeInBytes * conf.getConf(SQLConf.SHUFFLE_HASH_JOIN_FACTOR) <= b.stats.sizeInBytes
416432
}
417433

418434
def canBroadcastTokenTree(left: LogicalPlan,
@@ -436,5 +452,14 @@ trait JoinSelectionHelper {
436452
right.stats.sizeInBytes <= conf.containsJoinThreshold &&
437453
!hintToNotBroadcastRight(hint)
438454
}
455+
456+
/**
457+
* Returns whether a shuffled hash join should be force applied.
458+
* The config key is hard-coded because it's testing only and should not be exposed.
459+
*/
460+
private def forceApplyShuffledHashJoin(conf: SQLConf): Boolean = {
461+
Utils.isTesting &&
462+
conf.getConfString("spark.sql.join.forceApplyShuffledHashJoin", "false") == "true"
463+
}
439464
}
440465

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,14 @@ case object NO_BROADCAST_HASH extends JoinStrategyHint {
197197
override def hintAliases: Set[String] = Set.empty
198198
}
199199

200+
/**
201+
* An internal hint to encourage shuffle hash join, used by adaptive query execution.
202+
*/
203+
case object PREFER_SHUFFLE_HASH extends JoinStrategyHint {
204+
override def displayName: String = "prefer_shuffle_hash"
205+
override def hintAliases: Set[String] = Set.empty
206+
}
207+
200208
/**
201209
* The callback for implementing customized strategies of handling hint errors.
202210
*/

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,14 @@ object SQLConf {
498498
.intConf
499499
.createWithDefault(100000)
500500

501+
val SHUFFLE_HASH_JOIN_FACTOR = buildConf("spark.sql.shuffledHashJoinFactor")
502+
.doc("The shuffle hash join can be selected if the data size of small" +
503+
" side multiplied by this factor is still smaller than the large side.")
504+
.version("3.3.0")
505+
.intConf
506+
.checkValue(_ >= 1, "The shuffle hash join factor cannot be negative.")
507+
.createWithDefault(3)
508+
501509
val LIMIT_SCALE_UP_FACTOR = buildConf("spark.sql.limit.scaleUpFactor")
502510
.internal()
503511
.doc("Minimal increase rate in number of partitions between attempts when executing a take " +
@@ -696,6 +704,17 @@ object SQLConf {
696704
.bytesConf(ByteUnit.BYTE)
697705
.createOptional
698706

707+
val ADAPTIVE_MAX_SHUFFLE_HASH_JOIN_LOCAL_MAP_THRESHOLD =
708+
buildConf("spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold")
709+
.doc("Configures the maximum size in bytes per partition that can be allowed to build " +
710+
"local hash map. If this value is not smaller than " +
711+
s"${ADVISORY_PARTITION_SIZE_IN_BYTES.key} and all the partition size are not larger " +
712+
"than this config, join selection prefer to use shuffled hash join instead of " +
713+
s"sort merge join regardless of the value of ${PREFER_SORTMERGEJOIN.key}.")
714+
.version("3.2.0")
715+
.bytesConf(ByteUnit.BYTE)
716+
.createWithDefault(0L)
717+
699718
val SUBEXPRESSION_ELIMINATION_ENABLED =
700719
buildConf("spark.sql.subexpressionElimination.enabled")
701720
.internal()

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -226,13 +226,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
226226

227227
def createJoinWithoutHint() = {
228228
createBroadcastHashJoin(false)
229-
.orElse {
230-
if (!conf.preferSortMergeJoin) {
231-
createShuffleHashJoin(false)
232-
} else {
233-
None
234-
}
235-
}
229+
.orElse(createShuffleHashJoin(false))
236230
.orElse(createSortMergeJoin())
237231
.orElse(createCartesianProduct())
238232
.getOrElse {

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class AQEOptimizer(sparkSession: SparkSession) extends RuleExecutor[LogicalPlan]
3030
private val conf = sparkSession.sessionState.conf
3131

3232
private val defaultBatches = Seq(
33-
Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin),
33+
Batch("Dynamic Join Selection", Once, DynamicJoinSelection),
3434
Batch("Adaptive Bloom Filter Join", Once, AdaptiveBloomFilterJoin(sparkSession)),
3535
Batch("Eliminate Join to Empty Relation", Once, EliminateJoinToEmptyRelation),
3636
Batch("Optimize bloom filter Join", Once, OptimizeBloomFilterJoin)

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DemoteBroadcastHashJoin.scala

Lines changed: 0 additions & 61 deletions
This file was deleted.
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.adaptive
19+
20+
import org.apache.spark.MapOutputStatistics
21+
import org.apache.spark.sql.catalyst.optimizer.JoinSelectionHelper
22+
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
23+
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftOuter, RightOuter}
24+
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, Join, JoinStrategyHint, LogicalPlan, NO_BROADCAST_HASH, PREFER_SHUFFLE_HASH, SHUFFLE_HASH}
25+
import org.apache.spark.sql.catalyst.rules.Rule
26+
import org.apache.spark.sql.internal.SQLConf
27+
28+
/**
29+
* This optimization rule includes three join selection:
30+
* 1. detects a join child that has a high ratio of empty partitions and adds a
31+
* NO_BROADCAST_HASH hint to avoid it being broadcast, as shuffle join is faster in this case:
32+
* many tasks complete immediately since one join side is empty.
33+
* 2. detects a join child that every partition size is less than local map threshold and adds a
34+
* PREFER_SHUFFLE_HASH hint to encourage being shuffle hash join instead of sort merge join.
35+
* 3. if a join satisfies both NO_BROADCAST_HASH and PREFER_SHUFFLE_HASH,
36+
* then add a SHUFFLE_HASH hint.
37+
*/
38+
object DynamicJoinSelection extends Rule[LogicalPlan] with JoinSelectionHelper {
39+
40+
private def hasManyEmptyPartitions(mapStats: MapOutputStatistics): Boolean = {
41+
val partitionCnt = mapStats.bytesByPartitionId.length
42+
val nonZeroCnt = mapStats.bytesByPartitionId.count(_ > 0)
43+
partitionCnt > 0 && nonZeroCnt > 0 &&
44+
(nonZeroCnt * 1.0 / partitionCnt) < conf.nonEmptyPartitionRatioForBroadcastJoin
45+
}
46+
47+
private def preferShuffledHashJoin(mapStats: MapOutputStatistics): Boolean = {
48+
val maxShuffledHashJoinLocalMapThreshold =
49+
conf.getConf(SQLConf.ADAPTIVE_MAX_SHUFFLE_HASH_JOIN_LOCAL_MAP_THRESHOLD)
50+
val advisoryPartitionSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
51+
if (advisoryPartitionSize <= maxShuffledHashJoinLocalMapThreshold) {
52+
mapStats.bytesByPartitionId.forall(_ <= maxShuffledHashJoinLocalMapThreshold)
53+
} else {
54+
false
55+
}
56+
}
57+
58+
private def selectJoinStrategy(
59+
join: Join,
60+
isLeft: Boolean): Option[JoinStrategyHint] = {
61+
val plan = if (isLeft) join.left else join.right
62+
plan match {
63+
case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.isMaterialized
64+
&& stage.mapStats.isDefined =>
65+
66+
val manyEmptyInPlan = hasManyEmptyPartitions(stage.mapStats.get)
67+
val canBroadcastPlan = (isLeft && canBuildBroadcastLeft(join.joinType)) ||
68+
(!isLeft && canBuildBroadcastRight(join.joinType))
69+
val manyEmptyInOther = (if (isLeft) join.right else join.left) match {
70+
case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.isMaterialized
71+
&& stage.mapStats.isDefined => hasManyEmptyPartitions(stage.mapStats.get)
72+
case _ => false
73+
}
74+
75+
val demoteBroadcastHash = if (manyEmptyInPlan && canBroadcastPlan) {
76+
join.joinType match {
77+
// don't demote BHJ since you cannot short circuit local join if inner (null-filled)
78+
// side is empty
79+
case LeftOuter | RightOuter | LeftAnti => false
80+
case _ => true
81+
}
82+
} else if (manyEmptyInOther && canBroadcastPlan) {
83+
// for example, LOJ, !isLeft but it's the LHS that has many empty partitions if we
84+
// proceed with shuffle. But if we proceed with BHJ, the OptimizeShuffleWithLocalRead
85+
// will assemble partitions as they were before the shuffle and that may no longer have
86+
// many empty partitions and thus cannot short-circuit local join
87+
join.joinType match {
88+
case LeftOuter | RightOuter | LeftAnti => true
89+
case _ => false
90+
}
91+
} else {
92+
false
93+
}
94+
val rowNumberExceeded =
95+
stage.computeStats().exists(_.rowCount.exists(_.toLong >= conf.broadcastMaxRowNum))
96+
val adjustDemoteBroadcastHash = rowNumberExceeded || demoteBroadcastHash
97+
98+
val preferShuffleHash = preferShuffledHashJoin(stage.mapStats.get)
99+
if (adjustDemoteBroadcastHash && preferShuffleHash) {
100+
Some(SHUFFLE_HASH)
101+
} else if (adjustDemoteBroadcastHash) {
102+
Some(NO_BROADCAST_HASH)
103+
} else if (preferShuffleHash) {
104+
Some(PREFER_SHUFFLE_HASH)
105+
} else {
106+
None
107+
}
108+
109+
case _ => None
110+
}
111+
}
112+
113+
def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
114+
case j @ ExtractEquiJoinKeys(_, _, _, _, _, _, hint) =>
115+
var newHint = hint
116+
if (!hint.leftHint.exists(_.strategy.isDefined)) {
117+
selectJoinStrategy(j, true).foreach { strategy =>
118+
newHint = newHint.copy(leftHint =
119+
Some(hint.leftHint.getOrElse(HintInfo()).copy(strategy = Some(strategy))))
120+
}
121+
}
122+
if (!hint.rightHint.exists(_.strategy.isDefined)) {
123+
selectJoinStrategy(j, false).foreach { strategy =>
124+
newHint = newHint.copy(rightHint =
125+
Some(hint.rightHint.getOrElse(HintInfo()).copy(strategy = Some(strategy))))
126+
}
127+
}
128+
if (newHint.ne(hint)) {
129+
j.copy(hint = newHint)
130+
} else {
131+
j
132+
}
133+
}
134+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ abstract class QueryStageExec extends LeafExecNode {
108108
protected var _resultOption = new AtomicReference[Option[Any]](None)
109109

110110
private[adaptive] def resultOption: AtomicReference[Option[Any]] = _resultOption
111+
def isMaterialized: Boolean = resultOption.get().isDefined
111112

112113
override def output: Seq[Attribute] = plan.output
113114
override def outputPartitioning: Partitioning = plan.outputPartitioning

sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=10485760
1515
--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=true
16-
--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.preferSortMergeJoin=false
16+
--CONFIG_DIM1 spark.sql.autoBroadcastJoinThreshold=-1,spark.sql.join.forceApplyShuffledHashJoin=true
1717

1818
--CONFIG_DIM2 spark.sql.codegen.wholeStage=true
1919
--CONFIG_DIM2 spark.sql.codegen.wholeStage=false,spark.sql.codegen.factoryMode=CODEGEN_ONLY

0 commit comments

Comments
 (0)