Skip to content

Commit

Permalink
[SPARK-36612][SQL] Support left outer join build left or right outer …
Browse files Browse the repository at this point in the history
…join build right in shuffled hash join

### What changes were proposed in this pull request?
Add support for shuffle-hash join for following scenarios:

* left outer join with left-side build
* right outer join with right-side build

The algorithm is similar to SPARK-32399, which supports shuffle-hash join for full outer join.

The same methods fullOuterJoinWithUniqueKey and fullOuterJoinWithNonUniqueKey are improved to support the new case. These methods are called after the HashedRelation is already constructed of the build side, and do these two iterations:

1.  Iterate Stream side.
  a. If find match on build side, mark.
  b. If no match on build side, join with null build-side row and add to result
2. Iterate build side.
  a. If find marked for match, add joined row to result
  b. If no match marked, join with null stream-side row

The left outer join with left-side build, and right outer join with right-side build, need only a subset of these logics, namely replacing 1b above with a no-op.

Codegen is left for a follow-up PR.

### Why are the changes needed?
For joins of these types, shuffle-hash join can be more performant than sort-merge join, especially if the big table is large, as it skips an expensive sort of the big table.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Unit test in JoinSuite.scala

Closes apache#41398 from szehon-ho/same_side_outer_build_join_master.

Authored-by: Szehon Ho <[email protected]>
Signed-off-by: huaxingao <[email protected]>
(cherry picked from commit 0effbec)
  • Loading branch information
szehon-ho authored and huaxingao committed Jun 2, 2023
1 parent 07fcfa7 commit bafdd47
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,14 @@ trait JoinSelectionHelper {

def canBuildShuffledHashJoinLeft(joinType: JoinType): Boolean = {
joinType match {
case _: InnerLike | RightOuter | FullOuter => true
case _: InnerLike | LeftOuter | FullOuter | RightOuter => true
case _ => false
}
}

def canBuildShuffledHashJoinRight(joinType: JoinType): Boolean = {
joinType match {
case _: InnerLike | LeftOuter | FullOuter |
case _: InnerLike | LeftOuter | FullOuter | RightOuter |
LeftSemi | LeftAnti | _: ExistenceJoin => true
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ private[execution] object HashedRelation {
* Create a HashedRelation from an Iterator of InternalRow.
*
* @param allowsNullKey Allow NULL keys in HashedRelation.
* This is used for full outer join in `ShuffledHashJoinExec` only.
* @param ignoresDuplicatedKey Ignore rows with duplicated keys in HashedRelation.
* This is only used for semi and anti join without join condition in
* `ShuffledHashJoinExec` only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ case class ShuffledHashJoinExec(
override def outputPartitioning: Partitioning = super[ShuffledJoin].outputPartitioning

override def outputOrdering: Seq[SortOrder] = joinType match {
// For outer joins where the outer side is build-side, order cannot be guaranteed.
// The algorithm performs an additional un-ordered iteration on build-side (HashedRelation)
// to find unmatched rows to satisfy the outer join semantic.
case FullOuter => Nil
case LeftOuter if buildSide == BuildLeft => Nil
case RightOuter if buildSide == BuildRight => Nil
case _ => super.outputOrdering
}

Expand All @@ -83,8 +88,10 @@ case class ShuffledHashJoinExec(
iter,
buildBoundKeys,
taskMemoryManager = context.taskMemoryManager(),
// Full outer join needs support for NULL key in HashedRelation.
allowsNullKey = joinType == FullOuter,
// build-side or full outer join needs support for NULL key in HashedRelation.
allowsNullKey = joinType == FullOuter ||
(joinType == LeftOuter && buildSide == BuildLeft) ||
(joinType == RightOuter && buildSide == BuildRight),
ignoresDuplicatedKey = ignoreDuplicatedKey)
buildTime += NANOSECONDS.toMillis(System.nanoTime() - start)
buildDataSize += relation.estimatedSize
Expand All @@ -98,16 +105,22 @@ case class ShuffledHashJoinExec(
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
val hashed = buildHashedRelation(buildIter)
joinType match {
case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
case FullOuter => buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows,
isFullOuterJoin = true)
case LeftOuter if buildSide.equals(BuildLeft) =>
buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false)
case RightOuter if buildSide.equals(BuildRight) =>
buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false)
case _ => join(streamIter, hashed, numOutputRows)
}
}
}

private def fullOuterJoin(
private def buildSideOrFullOuterJoin(
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation,
numOutputRows: SQLMetric): Iterator[InternalRow] = {
numOutputRows: SQLMetric,
isFullOuterJoin: Boolean): Iterator[InternalRow] = {
val joinKeys = streamSideKeyGenerator()
val joinRow = new JoinedRow
val (joinRowWithStream, joinRowWithBuild) = {
Expand All @@ -130,11 +143,11 @@ case class ShuffledHashJoinExec(
}

val iter = if (hashedRelation.keyIsUnique) {
fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow)
buildSideOrFullOuterJoinUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin)
} else {
fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow)
buildSideOrFullOuterJoinNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin)
}

val resultProj = UnsafeProjection.create(output, output)
Expand All @@ -145,31 +158,38 @@ case class ShuffledHashJoinExec(
}

/**
* Full outer shuffled hash join with unique join keys:
* Shuffled hash join with unique join keys, where an outer side is the build side.
* 1. Process rows from stream side by looking up hash relation.
* Mark the matched rows from build side be looked up.
* A bit set is used to track matched rows with key index.
* 2. Process rows from build side by iterating hash relation.
* Filter out rows from build side being matched already,
* by checking key index from bit set.
*/
private def fullOuterJoinWithUniqueKey(
private def buildSideOrFullOuterJoinUniqueKey(
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation,
joinKeys: UnsafeProjection,
joinRowWithStream: InternalRow => JoinedRow,
joinRowWithBuild: InternalRow => JoinedRow,
streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
buildNullRow: GenericInternalRow): Iterator[InternalRow] = {
buildNullRow: GenericInternalRow,
isFullOuterJoin: Boolean): Iterator[InternalRow] = {
val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
longMetric("buildDataSize") += matchedKeys.capacity / 8

def noMatch = if (isFullOuterJoin) {
Some(joinRowWithBuild(buildNullRow))
} else {
None
}

// Process stream side with looking up hash relation
val streamResultIter = streamIter.map { srow =>
val streamResultIter = streamIter.flatMap { srow =>
joinRowWithStream(srow)
val keys = joinKeys(srow)
if (keys.anyNull) {
joinRowWithBuild(buildNullRow)
noMatch
} else {
val matched = hashedRelation.getValueWithKeyIndex(keys)
if (matched != null) {
Expand All @@ -178,12 +198,12 @@ case class ShuffledHashJoinExec(
val joinRow = joinRowWithBuild(buildRow)
if (boundCondition(joinRow)) {
matchedKeys.set(keyIndex)
joinRow
Some(joinRow)
} else {
joinRowWithBuild(buildNullRow)
noMatch
}
} else {
joinRowWithBuild(buildNullRow)
noMatch
}
}
}
Expand All @@ -205,7 +225,7 @@ case class ShuffledHashJoinExec(
}

/**
* Full outer shuffled hash join with non-unique join keys:
* Shuffled hash join with non-unique join keys, where an outer side is the build side.
* 1. Process rows from stream side by looking up hash relation.
* Mark the matched rows from build side be looked up.
* A [[OpenHashSet]] (Long) is used to track matched rows with
Expand All @@ -219,14 +239,15 @@ case class ShuffledHashJoinExec(
* the value indices of its tuples will be 0, 1 and 2.
* Note that value indices of tuples with different keys are incomparable.
*/
private def fullOuterJoinWithNonUniqueKey(
private def buildSideOrFullOuterJoinNonUniqueKey(
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation,
joinKeys: UnsafeProjection,
joinRowWithStream: InternalRow => JoinedRow,
joinRowWithBuild: InternalRow => JoinedRow,
streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
buildNullRow: GenericInternalRow): Iterator[InternalRow] = {
buildNullRow: GenericInternalRow,
isFullOuterJoin: Boolean): Iterator[InternalRow] = {
val matchedRows = new OpenHashSet[Long]
TaskContext.get().addTaskCompletionListener[Unit](_ => {
// At the end of the task, update the task's memory usage for this
Expand All @@ -252,7 +273,12 @@ case class ShuffledHashJoinExec(
val joinRow = joinRowWithStream(srow)
val keys = joinKeys(srow)
if (keys.anyNull) {
Iterator.single(joinRowWithBuild(buildNullRow))
// return row with build side NULL row to satisfy full outer join semantics if enabled
if (isFullOuterJoin) {
Iterator.single(joinRowWithBuild(buildNullRow))
} else {
Iterator.empty
}
} else {
val buildIter = hashedRelation.getWithKeyIndex(keys)
new RowIterator {
Expand All @@ -272,8 +298,8 @@ case class ShuffledHashJoinExec(
}
// When we reach here, it means no match is found for this key.
// So we need to return one row with build side NULL row,
// to satisfy the full outer join semantic.
if (!found) {
// to satisfy the full outer join semantic if enabled.
if (!found && isFullOuterJoin) {
joinRowWithBuild(buildNullRow)
// Set `found` to be true as we only need to return one row
// but no more.
Expand Down Expand Up @@ -314,6 +340,8 @@ case class ShuffledHashJoinExec(

override def supportCodegen: Boolean = joinType match {
case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN)
case LeftOuter if buildSide == BuildLeft => false
case RightOuter if buildSide == BuildRight => false
case _ => true
}

Expand Down
30 changes: 21 additions & 9 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,16 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil)), BuildLeft)
assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildRight)
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildLeft)
assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "right")), BuildLeft)

// Determine build side based on hint
assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildLeft)
assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t2)" :: Nil, "right")), BuildRight)

// Shuffle-hash hint prioritized over shuffle-replicate-nl hint
assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: "SHUFFLE_HASH(t1)" :: Nil)),
Expand All @@ -507,8 +513,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
BuildLeft)

// Shuffle-hash hint specified but not doable
assertBroadcastHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildRight)
assertBroadcastNLJoin(
sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil)), BuildLeft)
}
Expand Down Expand Up @@ -606,13 +610,25 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
withLogAppender(hintAppender, level = Some(Level.WARN)) {
assertShuffleMergeJoin(
df1.hint("BROADCAST").join(df2, $"a1" === $"b1", joinType))
}

val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
.filter(_.contains("is not supported in the query:"))
assert(logs.size === 1)
logs.foreach(log =>
assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join.")))
}

Seq("left_semi", "left_anti").foreach { joinType =>
val hintAppender = new LogAppender(s"join hint build side check for $joinType")
withLogAppender(hintAppender, level = Some(Level.WARN)) {
assertShuffleMergeJoin(
df1.hint("SHUFFLE_HASH").join(df2, $"a1" === $"b1", joinType))
}

val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
.filter(_.contains("is not supported in the query:"))
assert(logs.size === 2)
assert(logs.size === 1)
logs.foreach(log =>
assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join.")))
}
Expand All @@ -622,8 +638,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
withLogAppender(hintAppender, level = Some(Level.WARN)) {
assertBroadcastHashJoin(
df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType), BuildRight)
assertShuffleHashJoin(
df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType), BuildRight)
}

val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
Expand All @@ -636,12 +650,10 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
withLogAppender(hintAppender, level = Some(Level.WARN)) {
assertShuffleMergeJoin(
df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType))
assertShuffleMergeJoin(
df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType))
}
val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
.filter(_.contains("is not supported in the query:"))
assert(logs.size === 2)
assert(logs.size === 1)
logs.foreach(log =>
assert(log.contains(s"build right for ${joinType.split("_").mkString(" ")} join.")))
}
Expand Down
77 changes: 77 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,83 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
}

test("SPARK-36612: Support left outer join build left or right outer join build right in " +
"shuffled hash join") {
val inputDFs = Seq(
// Test unique join key
(spark.range(10).selectExpr("id as k1"),
spark.range(30).selectExpr("id as k2"),
$"k1" === $"k2"),
// Test non-unique join key
(spark.range(10).selectExpr("id % 5 as k1"),
spark.range(30).selectExpr("id % 5 as k2"),
$"k1" === $"k2"),
// Test empty build side
(spark.range(10).selectExpr("id as k1").filter("k1 < -1"),
spark.range(30).selectExpr("id as k2"),
$"k1" === $"k2"),
// Test empty stream side
(spark.range(10).selectExpr("id as k1"),
spark.range(30).selectExpr("id as k2").filter("k2 < -1"),
$"k1" === $"k2"),
// Test empty build and stream side
(spark.range(10).selectExpr("id as k1").filter("k1 < -1"),
spark.range(30).selectExpr("id as k2").filter("k2 < -1"),
$"k1" === $"k2"),
// Test string join key
(spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
spark.range(30).selectExpr("cast(id as string) as k2"),
$"k1" === $"k2"),
// Test build side at right
(spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
spark.range(10).selectExpr("cast(id as string) as k2"),
$"k1" === $"k2"),
// Test NULL join key
(spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value as k1"),
spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr("value as k2"),
$"k1" === $"k2"),
(spark.range(10).map(i => if (i % 3 == 0) i else null).selectExpr("value as k1"),
spark.range(30).map(i => if (i % 5 == 0) i else null).selectExpr("value as k2"),
$"k1" === $"k2"),
// Test multiple join keys
(spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(
"value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as long) as k3"),
spark.range(30).map(i => if (i % 3 == 0) i else null).selectExpr(
"value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as long) as k6"),
$"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6")
)

// test left outer with left side build
inputDFs.foreach { case (df1, df2, joinExprs) =>
val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter")
assert(collect(smjDF.queryExecution.executedPlan) {
case _: SortMergeJoinExec => true }.size === 1)
val smjResult = smjDF.collect()

val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter")
assert(collect(shjDF.queryExecution.executedPlan) {
case _: ShuffledHashJoinExec => true
}.size === 1)
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
}

// test right outer with right side build
inputDFs.foreach { case (df2, df1, joinExprs) =>
val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter")
assert(collect(smjDF.queryExecution.executedPlan) {
case _: SortMergeJoinExec => true }.size === 1)
val smjResult = smjDF.collect()

val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter")
assert(collect(shjDF.queryExecution.executedPlan) {
case _: ShuffledHashJoinExec => true
}.size === 1)
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
}
}

test("SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation") {
val inputDFs = Seq(
// Test empty build side for inner join
Expand Down

0 comments on commit bafdd47

Please sign in to comment.