diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 796e7d8f89d1a..f15ab7047566a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -374,14 +374,14 @@ trait JoinSelectionHelper { def canBuildShuffledHashJoinLeft(joinType: JoinType): Boolean = { joinType match { - case _: InnerLike | RightOuter | FullOuter => true + case _: InnerLike | LeftOuter | FullOuter | RightOuter => true case _ => false } } def canBuildShuffledHashJoinRight(joinType: JoinType): Boolean = { joinType match { - case _: InnerLike | LeftOuter | FullOuter | + case _: InnerLike | LeftOuter | FullOuter | RightOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 4d3e63282fabf..16345bb35db2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -127,7 +127,6 @@ private[execution] object HashedRelation { * Create a HashedRelation from an Iterator of InternalRow. * * @param allowsNullKey Allow NULL keys in HashedRelation. - * This is used for full outer join in `ShuffledHashJoinExec` only. * @param ignoresDuplicatedKey Ignore rows with duplicated keys in HashedRelation. * This is only used for semi and anti join without join condition in * `ShuffledHashJoinExec` only. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index cfe35d04778fb..8953bf19f35ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -56,7 +56,12 @@ case class ShuffledHashJoinExec( override def outputPartitioning: Partitioning = super[ShuffledJoin].outputPartitioning override def outputOrdering: Seq[SortOrder] = joinType match { + // For outer joins where the outer side is build-side, order cannot be guaranteed. + // The algorithm performs an additional un-ordered iteration on build-side (HashedRelation) + // to find unmatched rows to satisfy the outer join semantic. case FullOuter => Nil + case LeftOuter if buildSide == BuildLeft => Nil + case RightOuter if buildSide == BuildRight => Nil case _ => super.outputOrdering } @@ -83,8 +88,10 @@ case class ShuffledHashJoinExec( iter, buildBoundKeys, taskMemoryManager = context.taskMemoryManager(), - // Full outer join needs support for NULL key in HashedRelation. - allowsNullKey = joinType == FullOuter, + // build-side or full outer join needs support for NULL key in HashedRelation. + allowsNullKey = joinType == FullOuter || + (joinType == LeftOuter && buildSide == BuildLeft) || + (joinType == RightOuter && buildSide == BuildRight), ignoresDuplicatedKey = ignoreDuplicatedKey) buildTime += NANOSECONDS.toMillis(System.nanoTime() - start) buildDataSize += relation.estimatedSize @@ -98,16 +105,22 @@ case class ShuffledHashJoinExec( streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) => val hashed = buildHashedRelation(buildIter) joinType match { - case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows) + case FullOuter => buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, + isFullOuterJoin = true) + case LeftOuter if buildSide.equals(BuildLeft) => + buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false) + case RightOuter if buildSide.equals(BuildRight) => + buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false) case _ => join(streamIter, hashed, numOutputRows) } } } - private def fullOuterJoin( + private def buildSideOrFullOuterJoin( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, - numOutputRows: SQLMetric): Iterator[InternalRow] = { + numOutputRows: SQLMetric, + isFullOuterJoin: Boolean): Iterator[InternalRow] = { val joinKeys = streamSideKeyGenerator() val joinRow = new JoinedRow val (joinRowWithStream, joinRowWithBuild) = { @@ -130,11 +143,11 @@ case class ShuffledHashJoinExec( } val iter = if (hashedRelation.keyIsUnique) { - fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, - joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow) + buildSideOrFullOuterJoinUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, + joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin) } else { - fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, - joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow) + buildSideOrFullOuterJoinNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream, + joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin) } val resultProj = UnsafeProjection.create(output, output) @@ -145,7 +158,7 @@ case class ShuffledHashJoinExec( } /** - * Full outer shuffled hash join with unique join keys: + * Shuffled hash join with unique join keys, where an outer side is the build side. * 1. Process rows from stream side by looking up hash relation. * Mark the matched rows from build side be looked up. * A bit set is used to track matched rows with key index. @@ -153,23 +166,30 @@ case class ShuffledHashJoinExec( * Filter out rows from build side being matched already, * by checking key index from bit set. */ - private def fullOuterJoinWithUniqueKey( + private def buildSideOrFullOuterJoinUniqueKey( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, joinKeys: UnsafeProjection, joinRowWithStream: InternalRow => JoinedRow, joinRowWithBuild: InternalRow => JoinedRow, streamNullJoinRowWithBuild: => InternalRow => JoinedRow, - buildNullRow: GenericInternalRow): Iterator[InternalRow] = { + buildNullRow: GenericInternalRow, + isFullOuterJoin: Boolean): Iterator[InternalRow] = { val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex) longMetric("buildDataSize") += matchedKeys.capacity / 8 + def noMatch = if (isFullOuterJoin) { + Some(joinRowWithBuild(buildNullRow)) + } else { + None + } + // Process stream side with looking up hash relation - val streamResultIter = streamIter.map { srow => + val streamResultIter = streamIter.flatMap { srow => joinRowWithStream(srow) val keys = joinKeys(srow) if (keys.anyNull) { - joinRowWithBuild(buildNullRow) + noMatch } else { val matched = hashedRelation.getValueWithKeyIndex(keys) if (matched != null) { @@ -178,12 +198,12 @@ case class ShuffledHashJoinExec( val joinRow = joinRowWithBuild(buildRow) if (boundCondition(joinRow)) { matchedKeys.set(keyIndex) - joinRow + Some(joinRow) } else { - joinRowWithBuild(buildNullRow) + noMatch } } else { - joinRowWithBuild(buildNullRow) + noMatch } } } @@ -205,7 +225,7 @@ case class ShuffledHashJoinExec( } /** - * Full outer shuffled hash join with non-unique join keys: + * Shuffled hash join with non-unique join keys, where an outer side is the build side. * 1. Process rows from stream side by looking up hash relation. * Mark the matched rows from build side be looked up. * A [[OpenHashSet]] (Long) is used to track matched rows with @@ -219,14 +239,15 @@ case class ShuffledHashJoinExec( * the value indices of its tuples will be 0, 1 and 2. * Note that value indices of tuples with different keys are incomparable. */ - private def fullOuterJoinWithNonUniqueKey( + private def buildSideOrFullOuterJoinNonUniqueKey( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation, joinKeys: UnsafeProjection, joinRowWithStream: InternalRow => JoinedRow, joinRowWithBuild: InternalRow => JoinedRow, streamNullJoinRowWithBuild: => InternalRow => JoinedRow, - buildNullRow: GenericInternalRow): Iterator[InternalRow] = { + buildNullRow: GenericInternalRow, + isFullOuterJoin: Boolean): Iterator[InternalRow] = { val matchedRows = new OpenHashSet[Long] TaskContext.get().addTaskCompletionListener[Unit](_ => { // At the end of the task, update the task's memory usage for this @@ -252,7 +273,12 @@ case class ShuffledHashJoinExec( val joinRow = joinRowWithStream(srow) val keys = joinKeys(srow) if (keys.anyNull) { - Iterator.single(joinRowWithBuild(buildNullRow)) + // return row with build side NULL row to satisfy full outer join semantics if enabled + if (isFullOuterJoin) { + Iterator.single(joinRowWithBuild(buildNullRow)) + } else { + Iterator.empty + } } else { val buildIter = hashedRelation.getWithKeyIndex(keys) new RowIterator { @@ -272,8 +298,8 @@ case class ShuffledHashJoinExec( } // When we reach here, it means no match is found for this key. // So we need to return one row with build side NULL row, - // to satisfy the full outer join semantic. - if (!found) { + // to satisfy the full outer join semantic if enabled. + if (!found && isFullOuterJoin) { joinRowWithBuild(buildNullRow) // Set `found` to be true as we only need to return one row // but no more. @@ -314,6 +340,8 @@ case class ShuffledHashJoinExec( override def supportCodegen: Boolean = joinType match { case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN) + case LeftOuter if buildSide == BuildLeft => false + case RightOuter if buildSide == BuildRight => false case _ => true } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala index 1792b4c32eb11..7af826583bd45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala @@ -489,10 +489,16 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP assertShuffleHashJoin( sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil)), BuildLeft) assertShuffleHashJoin( - sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildRight) + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildLeft) assertShuffleHashJoin( sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "right")), BuildLeft) + // Determine build side based on hint + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildLeft) + assertShuffleHashJoin( + sql(equiJoinQueryWithHint("SHUFFLE_HASH(t2)" :: Nil, "right")), BuildRight) + // Shuffle-hash hint prioritized over shuffle-replicate-nl hint assertShuffleHashJoin( sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: "SHUFFLE_HASH(t1)" :: Nil)), @@ -507,8 +513,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP BuildLeft) // Shuffle-hash hint specified but not doable - assertBroadcastHashJoin( - sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildRight) assertBroadcastNLJoin( sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil)), BuildLeft) } @@ -606,13 +610,25 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP withLogAppender(hintAppender, level = Some(Level.WARN)) { assertShuffleMergeJoin( df1.hint("BROADCAST").join(df2, $"a1" === $"b1", joinType)) + } + + val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) + .filter(_.contains("is not supported in the query:")) + assert(logs.size === 1) + logs.foreach(log => + assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join."))) + } + + Seq("left_semi", "left_anti").foreach { joinType => + val hintAppender = new LogAppender(s"join hint build side check for $joinType") + withLogAppender(hintAppender, level = Some(Level.WARN)) { assertShuffleMergeJoin( df1.hint("SHUFFLE_HASH").join(df2, $"a1" === $"b1", joinType)) } val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) .filter(_.contains("is not supported in the query:")) - assert(logs.size === 2) + assert(logs.size === 1) logs.foreach(log => assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join."))) } @@ -622,8 +638,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP withLogAppender(hintAppender, level = Some(Level.WARN)) { assertBroadcastHashJoin( df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType), BuildRight) - assertShuffleHashJoin( - df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType), BuildRight) } val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) @@ -636,12 +650,10 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP withLogAppender(hintAppender, level = Some(Level.WARN)) { assertShuffleMergeJoin( df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType)) - assertShuffleMergeJoin( - df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType)) } val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage) .filter(_.contains("is not supported in the query:")) - assert(logs.size === 2) + assert(logs.size === 1) logs.foreach(log => assert(log.contains(s"build right for ${joinType.split("_").mkString(" ")} join."))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index f263c68693693..9ea4360810e1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1250,6 +1250,83 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } + test("SPARK-36612: Support left outer join build left or right outer join build right in " + + "shuffled hash join") { + val inputDFs = Seq( + // Test unique join key + (spark.range(10).selectExpr("id as k1"), + spark.range(30).selectExpr("id as k2"), + $"k1" === $"k2"), + // Test non-unique join key + (spark.range(10).selectExpr("id % 5 as k1"), + spark.range(30).selectExpr("id % 5 as k2"), + $"k1" === $"k2"), + // Test empty build side + (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), + spark.range(30).selectExpr("id as k2"), + $"k1" === $"k2"), + // Test empty stream side + (spark.range(10).selectExpr("id as k1"), + spark.range(30).selectExpr("id as k2").filter("k2 < -1"), + $"k1" === $"k2"), + // Test empty build and stream side + (spark.range(10).selectExpr("id as k1").filter("k1 < -1"), + spark.range(30).selectExpr("id as k2").filter("k2 < -1"), + $"k1" === $"k2"), + // Test string join key + (spark.range(10).selectExpr("cast(id * 3 as string) as k1"), + spark.range(30).selectExpr("cast(id as string) as k2"), + $"k1" === $"k2"), + // Test build side at right + (spark.range(30).selectExpr("cast(id / 3 as string) as k1"), + spark.range(10).selectExpr("cast(id as string) as k2"), + $"k1" === $"k2"), + // Test NULL join key + (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value as k1"), + spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr("value as k2"), + $"k1" === $"k2"), + (spark.range(10).map(i => if (i % 3 == 0) i else null).selectExpr("value as k1"), + spark.range(30).map(i => if (i % 5 == 0) i else null).selectExpr("value as k2"), + $"k1" === $"k2"), + // Test multiple join keys + (spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr( + "value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as long) as k3"), + spark.range(30).map(i => if (i % 3 == 0) i else null).selectExpr( + "value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as long) as k6"), + $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6") + ) + + // test left outer with left side build + inputDFs.foreach { case (df1, df2, joinExprs) => + val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) + } + + // test right outer with right side build + inputDFs.foreach { case (df2, df1, joinExprs) => + val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true + }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) + } + } + test("SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation") { val inputDFs = Seq( // Test empty build side for inner join