Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* 1) broadcasting the left side in a right outer join;
* 2) broadcasting the right side in a left outer, left semi, left anti or existence join;
* 3) broadcasting either side in an inner-like join.
* For other cases, we need to scan the data multiple times, which can be rather slow.
*
* - Shuffle-and-replicate nested loop join (a.k.a. cartesian product join):
* Supports both equi-joins and non-equi-joins.
Expand Down Expand Up @@ -306,16 +307,43 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {

// If it is not an equi-join, we first look at the join hints w.r.t. the following order:
// 1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast
// hints, choose the smaller side (based on stats) to broadcast.
// hints, choose the smaller side (based on stats) to broadcast for inner and full joins,
// choose the left side for right join, and choose right side for left join.
// 2. shuffle replicate NL hint: pick cartesian product if join type is inner like.
//
// If there is no hint or the hints are not applicable, we follow these rules one by one:
// 1. Pick cartesian product if join type is inner like, and both sides are too big to
// to broadcast.
// 2. Pick broadcast nested loop join. Pick the smaller side (based on stats) to broadcast.
// 1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left
// side is broadcast-able and it's left join, or only right side is broadcast-able and
// it's right join, we skip this rule. If both sides are small, broadcasts the smaller
// side for inner and full joins, broadcasts the left side for right join, and broadcasts
// right side for left join.
// 2. Pick cartesian product if join type is inner like.
// 3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice. It broadcasts the smaller side for inner and full joins, broadcasts the
// left side for right join, and broadcasts right side for left join.
case logical.Join(left, right, joinType, condition, hint) =>
val desiredBuildSide = if (joinType.isInstanceOf[InnerLike] || joinType == FullOuter) {
getSmallerSide(left, right)
} else {
// For perf reasons, `BroadcastNestedLoopJoinExec` prefers to broadcast left side if
// it's a right join, and broadcast right side if it's a left join.
// TODO: revisit it. If left side is much smaller than the right side, it may be better
// to broadcast the left side even if it's a left join.
if (canBuildLeft(joinType)) BuildLeft else BuildRight
}

def createBroadcastNLJoin(buildLeft: Boolean, buildRight: Boolean) = {
getBuildSide(buildLeft, buildRight, left, right).map { buildSide =>
val maybeBuildSide = if (buildLeft && buildRight) {
Some(desiredBuildSide)
} else if (buildLeft) {
Some(BuildLeft)
} else if (buildRight) {
Some(BuildRight)
} else {
None
}

maybeBuildSide.map { buildSide =>
Seq(joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition))
}
Expand All @@ -330,45 +358,18 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}

def createJoinWithoutHint() = {
(if (!canBroadcast(left) && !canBroadcast(right)) createCartesianProduct() else None)
createBroadcastNLJoin(canBroadcast(left), canBroadcast(right))
.orElse(createCartesianProduct())
.getOrElse {
// This join could be very slow or OOM
val buildSide = getSmallerSide(left, right)
Seq(joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition))
planLater(left), planLater(right), desiredBuildSide, joinType, condition))
}
}

if (joinType.isInstanceOf[InnerLike] || joinType == FullOuter) {
createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
.orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
.getOrElse(createJoinWithoutHint())
} else {
val smallerSide = getSmallerSide(left, right)
val buildSide = if (canBuildLeft(joinType)) {
// For RIGHT JOIN, we may broadcast left side even if the hint asks us to broadcast
// the right side. This is for history reasons.
if (hintToBroadcastLeft(hint) || canBroadcast(left)) {
BuildLeft
} else if (hintToBroadcastRight(hint)) {
BuildRight
} else {
smallerSide
}
} else {
// For LEFT JOIN, we may broadcast right side even if the hint asks us to broadcast
// the left side. This is for history reasons.
if (hintToBroadcastRight(hint) || canBroadcast(right)) {
BuildRight
} else if (hintToBroadcastLeft(hint)) {
BuildLeft
} else {
smallerSide
}
}
Seq(joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition))
}
createBroadcastNLJoin(hintToBroadcastLeft(hint), hintToBroadcastRight(hint))
.orElse { if (hintToShuffleReplicateNL(hint)) createCartesianProduct() else None }
.getOrElse(createJoinWithoutHint())


// --- Cases where this strategy does not apply ---------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,16 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
val t2Size = spark.table("t2").queryExecution.analyzed.children.head.stats.sizeInBytes
assert(t1Size < t2Size)

/* ######## test cases for equal join ######### */
// INNER JOIN && t1Size < t2Size => BuildLeft
assertJoinBuildSide(
"SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2 ON t1.key = t2.key", bh, BuildLeft)
// LEFT JOIN => BuildRight
// broadcast hash join can not build left side for left join.
assertJoinBuildSide(
"SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN t2 ON t1.key = t2.key", bh, BuildRight)
// RIGHT JOIN => BuildLeft
// broadcast hash join can not build right side for right join.
assertJoinBuildSide(
"SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 RIGHT JOIN t2 ON t1.key = t2.key", bh, BuildLeft)
// INNER JOIN && broadcast(t1) => BuildLeft
Expand All @@ -297,16 +300,20 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
assertJoinBuildSide(
"SELECT /*+ MAPJOIN(t2) */ * FROM t1 JOIN t2 ON t1.key = t2.key", bh, BuildRight)


/* ######## test cases for non-equal join ######### */
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
// INNER JOIN && t1Size < t2Size => BuildLeft
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 JOIN t2", bl, BuildLeft)
// FULL JOIN && t1Size < t2Size => BuildLeft
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 FULL JOIN t2", bl, BuildLeft)
// FULL OUTER && t1Size < t2Size => BuildLeft
assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft)
// LEFT JOIN => BuildRight
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 LEFT JOIN t2", bl, BuildRight)
// RIGHT JOIN => BuildLeft
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1, t2) */ * FROM t1 RIGHT JOIN t2", bl, BuildLeft)

/* #### test with broadcast hint #### */
// INNER JOIN && broadcast(t1) => BuildLeft
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 JOIN t2", bl, BuildLeft)
// INNER JOIN && broadcast(t2) => BuildRight
Expand All @@ -316,8 +323,10 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
// FULL OUTER && broadcast(t2) => BuildRight
assertJoinBuildSide(
"SELECT /*+ MAPJOIN(t2) */ * FROM t1 FULL OUTER JOIN t2", bl, BuildRight)
// FULL OUTER && t1Size < t2Size => BuildLeft
assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft)
// LEFT JOIN && broadcast(t1) => BuildLeft
assertJoinBuildSide("SELECT /*+ MAPJOIN(t1) */ * FROM t1 LEFT JOIN t2", bl, BuildLeft)
// RIGHT JOIN && broadcast(t2) => BuildRight
assertJoinBuildSide("SELECT /*+ MAPJOIN(t2) */ * FROM t1 RIGHT JOIN t2", bl, BuildRight)
}
}
}
Expand All @@ -332,6 +341,7 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
val t2Size = spark.table("t2").queryExecution.analyzed.children.head.stats.sizeInBytes
assert(t1Size < t2Size)

/* ######## test cases for equal join ######### */
assertJoinBuildSide("SELECT * FROM t1 JOIN t2 ON t1.key = t2.key", bh, BuildLeft)
assertJoinBuildSide("SELECT * FROM t2 JOIN t1 ON t1.key = t2.key", bh, BuildRight)

Expand All @@ -341,13 +351,23 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
assertJoinBuildSide("SELECT * FROM t1 RIGHT JOIN t2 ON t1.key = t2.key", bh, BuildLeft)
assertJoinBuildSide("SELECT * FROM t2 RIGHT JOIN t1 ON t1.key = t2.key", bh, BuildLeft)

/* ######## test cases for non-equal join ######### */
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
// For full outer join, prefer to broadcast the smaller side.
assertJoinBuildSide("SELECT * FROM t1 FULL OUTER JOIN t2", bl, BuildLeft)
assertJoinBuildSide("SELECT * FROM t2 FULL OUTER JOIN t1", bl, BuildRight)

// For inner join, prefer to broadcast the smaller side, if broadcast-able.
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> (t2Size + 1).toString()) {
assertJoinBuildSide("SELECT * FROM t1 JOIN t2", bl, BuildLeft)
assertJoinBuildSide("SELECT * FROM t2 JOIN t1", bl, BuildRight)
}

// For left join, prefer to broadcast the right side.
assertJoinBuildSide("SELECT * FROM t1 LEFT JOIN t2", bl, BuildRight)
assertJoinBuildSide("SELECT * FROM t2 LEFT JOIN t1", bl, BuildRight)

// For right join, prefer to broadcast the left side.
assertJoinBuildSide("SELECT * FROM t1 RIGHT JOIN t2", bl, BuildLeft)
assertJoinBuildSide("SELECT * FROM t2 RIGHT JOIN t1", bl, BuildLeft)
}
Expand Down