Skip to content

Commit 6ef57d3

Browse files
c21cloud-fan
authored andcommitted
[SPARK-34514][SQL] Push down limit for LEFT SEMI and LEFT ANTI join
### What changes were proposed in this pull request? I found out during code review of #31567 (comment), where we can push down limit to the left side of LEFT SEMI and LEFT ANTI join, if the join condition is empty. Why it's safe to push down limit: The semantics of LEFT SEMI join without condition: (1). if right side is non-empty, output all rows from left side. (2). if right side is empty, output nothing. The semantics of LEFT ANTI join without condition: (1). if right side is non-empty, output nothing. (2). if right side is empty, output all rows from left side. With the semantics of output all rows from left side or nothing (all or nothing), it's safe to push down limit to left side. NOTE: LEFT SEMI / LEFT ANTI join with non-empty condition is not safe for limit push down, because output can be a portion of left side rows. Reference: physical operator implementation for LEFT SEMI / LEFT ANTI join without condition - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala#L200-L204 . ### Why are the changes needed? Better performance. Save CPU and IO for these joins, as limit being pushed down before join. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added unit test in `LimitPushdownSuite.scala` and `SQLQuerySuite.scala`. Closes #31630 from c21/limit-pushdown. Authored-by: Cheng Su <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 14934f4 commit 6ef57d3

File tree

3 files changed

+62
-8
lines changed

3 files changed

+62
-8
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ object RemoveNoopOperators extends Rule[LogicalPlan] {
502502
}
503503

504504
/**
505-
* Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins.
505+
* Pushes down [[LocalLimit]] beneath UNION ALL and joins.
506506
*/
507507
object LimitPushDown extends Rule[LogicalPlan] {
508508

@@ -539,12 +539,16 @@ object LimitPushDown extends Rule[LogicalPlan] {
539539
// pushdown Limit.
540540
case LocalLimit(exp, u: Union) =>
541541
LocalLimit(exp, u.copy(children = u.children.map(maybePushLocalLimit(exp, _))))
542-
// Add extra limits below JOIN. For LEFT OUTER and RIGHT OUTER JOIN we push limits to
543-
// the left and right sides, respectively. For INNER and CROSS JOIN we push limits to
544-
// both the left and right sides if join condition is empty. It's not safe to push limits
545-
// below FULL OUTER JOIN in the general case without a more invasive rewrite.
546-
// We also need to ensure that this limit pushdown rule will not eventually introduce limits
547-
// on both sides if it is applied multiple times. Therefore:
542+
// Add extra limits below JOIN:
543+
// 1. For LEFT OUTER and RIGHT OUTER JOIN, we push limits to the left and right sides,
544+
// respectively.
545+
// 2. For INNER and CROSS JOIN, we push limits to both the left and right sides if join
546+
// condition is empty.
547+
// 3. For LEFT SEMI and LEFT ANTI JOIN, we push limits to the left side if join condition
548+
// is empty.
549+
// It's not safe to push limits below FULL OUTER JOIN in the general case without a more
550+
// invasive rewrite. We also need to ensure that this limit pushdown rule will not eventually
551+
// introduce limits on both sides if it is applied multiple times. Therefore:
548552
// - If one side is already limited, stack another limit on top if the new limit is smaller.
549553
// The redundant limit will be collapsed by the CombineLimits rule.
550554
case LocalLimit(exp, join @ Join(left, right, joinType, conditionOpt, _)) =>
@@ -555,6 +559,8 @@ object LimitPushDown extends Rule[LogicalPlan] {
555559
join.copy(
556560
left = maybePushLocalLimit(exp, left),
557561
right = maybePushLocalLimit(exp, right))
562+
case LeftSemi | LeftAnti if conditionOpt.isEmpty =>
563+
join.copy(left = maybePushLocalLimit(exp, left))
558564
case _ => join
559565
}
560566
LocalLimit(exp, newJoin)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
2222
import org.apache.spark.sql.catalyst.dsl.expressions._
2323
import org.apache.spark.sql.catalyst.dsl.plans._
2424
import org.apache.spark.sql.catalyst.expressions.Add
25-
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, PlanTest, RightOuter}
25+
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter}
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.rules._
2828

@@ -212,4 +212,22 @@ class LimitPushdownSuite extends PlanTest {
212212
comparePlans(optimized, correctAnswer)
213213
}
214214
}
215+
216+
test("SPARK-34514: Push down limit through LEFT SEMI and LEFT ANTI join") {
217+
// Push down when condition is empty
218+
Seq(LeftSemi, LeftAnti).foreach { joinType =>
219+
val originalQuery = x.join(y, joinType).limit(1)
220+
val optimized = Optimize.execute(originalQuery.analyze)
221+
val correctAnswer = Limit(1, LocalLimit(1, x).join(y, joinType)).analyze
222+
comparePlans(optimized, correctAnswer)
223+
}
224+
225+
// No push down when condition is not empty
226+
Seq(LeftSemi, LeftAnti).foreach { joinType =>
227+
val originalQuery = x.join(y, joinType, Some("x.a".attr === "y.b".attr)).limit(1)
228+
val optimized = Optimize.execute(originalQuery.analyze)
229+
val correctAnswer = Limit(1, x.join(y, joinType, Some("x.a".attr === "y.b".attr))).analyze
230+
comparePlans(optimized, correctAnswer)
231+
}
232+
}
215233
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4034,6 +4034,36 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
40344034
checkAnswer(df, Row(0, 0) :: Row(0, 1) :: Row(0, 2) :: Nil)
40354035
}
40364036
}
4037+
4038+
test("SPARK-34514: Push down limit through LEFT SEMI and LEFT ANTI join") {
4039+
withTable("left_table", "nonempty_right_table", "empty_right_table") {
4040+
spark.range(5).toDF().repartition(1).write.saveAsTable("left_table")
4041+
spark.range(3).write.saveAsTable("nonempty_right_table")
4042+
spark.range(0).write.saveAsTable("empty_right_table")
4043+
Seq("LEFT SEMI", "LEFT ANTI").foreach { joinType =>
4044+
val joinWithNonEmptyRightDf = spark.sql(
4045+
s"SELECT * FROM left_table $joinType JOIN nonempty_right_table LIMIT 3")
4046+
val joinWithEmptyRightDf = spark.sql(
4047+
s"SELECT * FROM left_table $joinType JOIN empty_right_table LIMIT 3")
4048+
4049+
Seq(joinWithNonEmptyRightDf, joinWithEmptyRightDf).foreach { df =>
4050+
val pushedLocalLimits = df.queryExecution.optimizedPlan.collect {
4051+
case l @ LocalLimit(_, _: LogicalRelation) => l
4052+
}
4053+
assert(pushedLocalLimits.length === 1)
4054+
}
4055+
4056+
val expectedAnswer = Seq(Row(0), Row(1), Row(2))
4057+
if (joinType == "LEFT SEMI") {
4058+
checkAnswer(joinWithNonEmptyRightDf, expectedAnswer)
4059+
checkAnswer(joinWithEmptyRightDf, Seq.empty)
4060+
} else {
4061+
checkAnswer(joinWithNonEmptyRightDf, Seq.empty)
4062+
checkAnswer(joinWithEmptyRightDf, expectedAnswer)
4063+
}
4064+
}
4065+
}
4066+
}
40374067
}
40384068

40394069
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)