From bafdd475d253f40d2127efbe61d19f60f3415935 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 2 Jun 2023 10:48:32 -0700 Subject: [PATCH] [SPARK-36612][SQL] Support left outer join build left or right outer join build right in shuffled hash join ### What changes were proposed in this pull request? Add support for shuffle-hash join for following scenarios: * left outer join with left-side build * right outer join with right-side build The algorithm is similar to SPARK-32399, which supports shuffle-hash join for full outer join. The same methods fullOuterJoinWithUniqueKey and fullOuterJoinWithNonUniqueKey are improved to support the new case. These methods are called after the HashedRelation is already constructed of the build side, and do these two iterations: 1. Iterate Stream side. a. If find match on build side, mark. b. If no match on build side, join with null build-side row and add to result 2. Iterate build side. a. If find marked for match, add joined row to result b. If no match marked, join with null stream-side row The left outer join with left-side build, and right outer join with right-side build, need only a subset of these logics, namely replacing 1b above with a no-op. Codegen is left for a follow-up PR. ### Why are the changes needed? For joins of these types, shuffle-hash join can be more performant than sort-merge join, especially if the big table is large, as it skips an expensive sort of the big table. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test in JoinSuite.scala Closes #41398 from szehon-ho/same_side_outer_build_join_master. Authored-by: Szehon Ho Signed-off-by: huaxingao (cherry picked from commit 0effbec16edc27c644d4089bdf266cd4ecbed235) --- .../spark/sql/catalyst/optimizer/joins.scala | 4 +- .../sql/execution/joins/HashedRelation.scala | 1 - .../joins/ShuffledHashJoinExec.scala | 74 ++++++++++++------ .../org/apache/spark/sql/JoinHintSuite.scala | 30 +++++--- .../org/apache/spark/sql/JoinSuite.scala | 77 +++++++++++++++++++ 5 files changed, 151 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 796e7d8f89d1a..f15ab7047566a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -374,14 +374,14 @@ trait JoinSelectionHelper { def canBuildShuffledHashJoinLeft(joinType: JoinType): Boolean = { joinType match { - case _: InnerLike | RightOuter | FullOuter => true + case _: InnerLike | LeftOuter | FullOuter | RightOuter => true case _ => false } } def canBuildShuffledHashJoinRight(joinType: JoinType): Boolean = { joinType match { - case _: InnerLike | LeftOuter | FullOuter | + case _: InnerLike | LeftOuter | FullOuter | RightOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 4d3e63282fabf..16345bb35db2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -127,7 +127,6 @@ private[execution] object HashedRelation { * Create a HashedRelation from an Iterator of InternalRow. * * @param allowsNullKey Allow NULL keys in HashedRelation. - * This is used for full outer join in `ShuffledHashJoinExec` only. * @param ignoresDuplicatedKey Ignore rows with duplicated keys in HashedRelation. * This is only used for semi and anti join without join condition in * `ShuffledHashJoinExec` only. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index cfe35d04778fb..8953bf19f35ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -56,7 +56,12 @@ case class ShuffledHashJoinExec( override def outputPartitioning: Partitioning = super[ShuffledJoin].outputPartitioning override def outputOrdering: Seq[SortOrder] = joinType match { + // For outer joins where the outer side is build-side, order cannot be guaranteed. + // The algorithm performs an additional un-ordered iteration on build-side (HashedRelation) + // to find unmatched rows to satisfy the outer join semantic. case FullOuter => Nil + case LeftOuter if buildSide == BuildLeft => Nil + case RightOuter if buildSide == BuildRight => Nil case _ => super.outputOrdering } @@ -83,8 +88,10 @@ case class ShuffledHashJoinExec( iter, buildBoundKeys, taskMemoryManager = context.taskMemoryManager(), - // Full outer join needs support for NULL key in HashedRelation. - allowsNullKey = joinType == FullOuter, + // build-side or full outer join needs support for NULL key in HashedRelation. + allowsNullKey = joinType == FullOuter || + (joinType == LeftOuter && buildSide == BuildLeft) || + (joinType == RightOuter && buildSide == BuildRight), ignoresDuplicatedKey = ignoreDuplicatedKey) buildTime += NANOSECONDS.toMillis(System.nanoTime() - start) buildDataSize += relation.estimatedSize @@ -98,16 +105,22 @@ case class ShuffledHashJoinExec( streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) => val hashed = buildHashedRelation(buildIter) joinType match { - case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows) + case FullOuter => buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, + isFullOuterJoin = true) + case LeftOuter if buildSide.equals(BuildLeft) => + buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false) + case RightOuter if buildSide.equals(BuildRight) => + buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false) case _ => join(streamIter, hashed, numOutputRows) } } } - private def fullOuterJoin( + private def buildSideOrFullOuterJoin( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, - numOutputRows: SQLMetric): Iterator[InternalRow] = { + numOutputRows: SQLMetric, + isFullOuterJoin: Boolean): Iterator[InternalRow] = { val joinKeys = streamSideKeyGenerator() val joinRow = new JoinedRow val (joinRowWithStream, joinRowWithBuild) = { @@ -130,11 +143,11 @@ case class ShuffledHashJoinExec( } val iter = if (hashedRelation.keyIsUnique) { - fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, - joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow) + buildSideOrFullOuterJoinUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, + joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin) } else { - fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, - joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow) + buildSideOrFullOuterJoinNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, + joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin) } val resultProj = UnsafeProjection.create(output, output) @@ -145,7 +158,7 @@ case class ShuffledHashJoinExec( } /** - * Full outer shuffled hash join with unique join keys: + * Shuffled hash join with unique join keys, where an outer side is the build side. * 1. Process rows from stream side by looking up hash relation. * Mark the matched rows from build side be looked up. * A bit set is used to track matched rows with key index. @@ -153,23 +166,30 @@ case class ShuffledHashJoinExec( * Filter out rows from build side being matched already, * by checking key index from bit set. */ - private def fullOuterJoinWithUniqueKey( + private def buildSideOrFullOuterJoinUniqueKey( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, joinKeys: UnsafeProjection, joinRowWithStream: InternalRow => JoinedRow, joinRowWithBuild: InternalRow => JoinedRow, streamNullJoinRowWithBuild: => InternalRow => JoinedRow, - buildNullRow: GenericInternalRow): Iterator[InternalRow] = { + buildNullRow: GenericInternalRow, + isFullOuterJoin: Boolean): Iterator[InternalRow] = { val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex) longMetric("buildDataSize") += matchedKeys.capacity / 8 + def noMatch = if (isFullOuterJoin) { + Some(joinRowWithBuild(buildNullRow)) + } else { + None + } + // Process stream side with looking up hash relation - val streamResultIter = streamIter.map { srow => + val streamResultIter = streamIter.flatMap { srow => joinRowWithStream(srow) val keys = joinKeys(srow) if (keys.anyNull) { - joinRowWithBuild(buildNullRow) + noMatch } else { val matched = hashedRelation.getValueWithKeyIndex(keys) if (matched != null) { @@ -178,12 +198,12 @@ case class ShuffledHashJoinExec( val joinRow = joinRowWithBuild(buildRow) if (boundCondition(joinRow)) { matchedKeys.set(keyIndex) - joinRow + Some(joinRow) } else { - joinRowWithBuild(buildNullRow) + noMatch } } else { - joinRowWithBuild(buildNullRow) + noMatch } } } @@ -205,7 +225,7 @@ case class ShuffledHashJoinExec( } /** - * Full outer shuffled hash join with non-unique join keys: + * Shuffled hash join with non-unique join keys, where an outer side is the build side. * 1. Process rows from stream side by looking up hash relation. * Mark the matched rows from build side be looked up. * A [[OpenHashSet]] (Long) is used to track matched rows with @@ -219,14 +239,15 @@ case class ShuffledHashJoinExec( * the value indices of its tuples will be 0, 1 and 2. * Note that value indices of tuples with different keys are incomparable. */ - private def fullOuterJoinWithNonUniqueKey( + private def buildSideOrFullOuterJoinNonUniqueKey( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, joinKeys: UnsafeProjection, joinRowWithStream: InternalRow => JoinedRow, joinRowWithBuild: InternalRow => JoinedRow, streamNullJoinRowWithBuild: => InternalRow => JoinedRow, - buildNullRow: GenericInternalRow): Iterator[InternalRow] = { + buildNullRow: GenericInternalRow, + isFullOuterJoin: Boolean): Iterator[InternalRow] = { val matchedRows = new OpenHashSet[Long] TaskContext.get().addTaskCompletionListener[Unit](_ => { // At the end of the task, update the task's memory usage for this @@ -252,7 +273,12 @@ case class ShuffledHashJoinExec( val joinRow = joinRowWithStream(srow) val keys = joinKeys(srow) if (keys.anyNull) { - Iterator.single(joinRowWithBuild(buildNullRow)) + // return row with build side NULL row to satisfy full outer join semantics if enabled + if (isFullOuterJoin) { + Iterator.single(joinRowWithBuild(buildNullRow)) + } else { + Iterator.empty + } } else { val buildIter = hashedRelation.getWithKeyIndex(keys) new RowIterator { @@ -272,8 +298,8 @@ case class ShuffledHashJoinExec( } // When we reach here, it means no match is found for this key. // So we need to return one row with build side NULL row, - // to satisfy the full outer join semantic. - if (!found) { + // to satisfy the full outer join semantic if enabled. + if (!found && isFullOuterJoin) { joinRowWithBuild(buildNullRow) // Set `found` to be true as we only need to return one row // but no more. @@ -314,6 +340,8 @@ case class ShuffledHashJoinExec( override def supportCodegen: Boolean = joinType match { case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN) + case LeftOuter if buildSide == BuildLeft => false + case RightOuter if buildSide == BuildRight => false case _ => true } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 1792b4c32eb11..7af826583bd45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala @@ -489,10 +489,16 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP assertShuffleHashJoin( sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil)), BuildLeft) assertShuffleHashJoin( - sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildRight) + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildLeft) assertShuffleHashJoin( sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "right")), BuildLeft) + // Determine build side based on hint + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildLeft) + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t2)" :: Nil, "right")), BuildRight) + // Shuffle-hash hint prioritized over shuffle-replicate-nl hint assertShuffleHashJoin( sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: "SHUFFLE_HASH(t1)" :: Nil)), @@ -507,8 +513,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP BuildLeft) // Shuffle-hash hint specified but not doable - assertBroadcastHashJoin( - sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildRight) assertBroadcastNLJoin( sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil)), BuildLeft) } @@ -606,13 +610,25 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP withLogAppender(hintAppender, level = Some(Level.WARN)) { assertShuffleMergeJoin( df1.hint("BROADCAST").join(df2, $"a1" === $"b1", joinType)) + } + + val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + .filter(_.contains("is not supported in the query:")) + assert(logs.size === 1) + logs.foreach(log => + assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join."))) + } + + Seq("left_semi", "left_anti").foreach { joinType => + val hintAppender = new LogAppender(s"join hint build side check for $joinType") + withLogAppender(hintAppender, level = Some(Level.WARN)) { assertShuffleMergeJoin( df1.hint("SHUFFLE_HASH").join(df2, $"a1" === $"b1", joinType)) } val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) .filter(_.contains("is not supported in the query:")) - assert(logs.size === 2) + assert(logs.size === 1) logs.foreach(log => assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join."))) } @@ -622,8 +638,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP withLogAppender(hintAppender, level = Some(Level.WARN)) { assertBroadcastHashJoin( df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType), BuildRight) - assertShuffleHashJoin( - df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType), BuildRight) } val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) @@ -636,12 +650,10 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP withLogAppender(hintAppender, level = Some(Level.WARN)) { assertShuffleMergeJoin( df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType)) - assertShuffleMergeJoin( - df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType)) } val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) .filter(_.contains("is not supported in the query:")) - assert(logs.size === 2) + assert(logs.size === 1) logs.foreach(log => assert(log.contains(s"build right for ${joinType.split("_").mkString(" ")} join."))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index f263c68693693..9ea4360810e1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1250,6 +1250,83 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } + test("SPARK-36612: Support left outer join build left or right outer join build right in " + + "shuffled hash join") { + val inputDFs = Seq( + // Test unique join key + (spark.range(10).selectExpr("id as k1"), + spark.range(30).selectExpr("id as k2"), + $"k1" === $"k2"), + // Test non-unique join key + (spark.range(10).selectExpr("id % 5 as k1"), + spark.range(30).selectExpr("id % 5 as k2"), + $"k1" === $"k2"), + // Test empty build side + (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), + spark.range(30).selectExpr("id as k2"), + $"k1" === $"k2"), + // Test empty stream side + (spark.range(10).selectExpr("id as k1"), + spark.range(30).selectExpr("id as k2").filter("k2 < -1"), + $"k1" === $"k2"), + // Test empty build and stream side + (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), + spark.range(30).selectExpr("id as k2").filter("k2 < -1"), + $"k1" === $"k2"), + // Test string join key + (spark.range(10).selectExpr("cast(id * 3 as string) as k1"), + spark.range(30).selectExpr("cast(id as string) as k2"), + $"k1" === $"k2"), + // Test build side at right + (spark.range(30).selectExpr("cast(id / 3 as string) as k1"), + spark.range(10).selectExpr("cast(id as string) as k2"), + $"k1" === $"k2"), + // Test NULL join key + (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value as k1"), + spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr("value as k2"), + $"k1" === $"k2"), + (spark.range(10).map(i => if (i % 3 == 0) i else null).selectExpr("value as k1"), + spark.range(30).map(i => if (i % 5 == 0) i else null).selectExpr("value as k2"), + $"k1" === $"k2"), + // Test multiple join keys + (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr( + "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as long) as k3"), + spark.range(30).map(i => if (i % 3 == 0) i else null).selectExpr( + "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as long) as k6"), + $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6") + ) + + // test left outer with left side build + inputDFs.foreach { case (df1, df2, joinExprs) => + val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) + } + + // test right outer with right side build + inputDFs.foreach { case (df2, df1, joinExprs) => + val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) + } + } + test("SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation") { val inputDFs = Seq( // Test empty build side for inner join