Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
import org.apache.spark.sql.catalyst.plans.{Inner, LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys
Expand All @@ -31,13 +31,20 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys
* 2. Join is inner join, and either side of join is empty. Eliminate join to an empty
* [[LocalRelation]].
*
* 3. Join is left semi join
* 3.1. Join right side is empty. Eliminate join to an empty [[LocalRelation]].
* 3.2. Join right side is non-empty and condition is empty. Eliminate join to its left side.
* 3. Join is outer join. Eliminate join to empty [[LocalRelation]] if:
* 3.1. Left outer if left side is empty
* 3.2. Right outer if left side is empty
* 3.3. Full outer if both side are empty
*
* 4. Join is left anti join
* 4.1. Join right side is empty. Eliminate join to its left side.
* 4.2. Join right side is non-empty and condition is empty. Eliminate join to an empty
* 4. Join is left semi join
* 4.1. Join left side is empty. Eliminate join to an empty [[LocalRelation]].
* 4.2. Join right side is empty. Eliminate join to an empty [[LocalRelation]].
* 4.3. Join right side is non-empty and condition is empty. Eliminate join to its left side.
*
* 5. Join is left anti join
* 5.1. Join left side is empty. Eliminate join to an empty [[LocalRelation]].
* 5.2. Join right side is empty. Eliminate join to its left side.
* 5.3. Join right side is non-empty and condition is empty. Eliminate join to an empty
* [[LocalRelation]].
*
* This applies to all joins (sort merge join, shuffled hash join, broadcast hash join, and
Expand All @@ -59,19 +66,33 @@ object EliminateUnnecessaryJoin extends Rule[LogicalPlan] {
case Some(count) => hasRow == (count > 0)
case _ => false
}

case LocalRelation(_, data, isStreaming) if !isStreaming =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to handle LocalRelation, then it's not AQE specific and we can do it in the normal optimizer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but currently we have no chance to do normal optimizer at AQE side. Maybe we can let some rules which in Optimizer also available at AQEOptimizer in future ?

Copy link
Contributor

@cloud-fan cloud-fan May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since AQE is on by default, it's not a big issue but more about code cleanness. How about this:

  1. EliminateUnnecessaryJoin should only deal with LocalRelation, and appears in both the normal optimizer and AQE optimizer
  2. AQE optimizer adds a new rule to turn empty query stage into empty LocalRelation
  3. AQE optimizer adds a new rule to deal with non empty query stage for left semi/anti joins

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm, and just find a exists rule PropagateEmptyRelation.

How about this updating ?

  • Add a rule ConvertToLocalRelation to turn empty query stage into empty LocalRelation
  • Make PropagateEmptyRelation appears in AQE optimizer
  • Reduce EliminateUnnecessaryJoin to only handle naaj/semi/anti joins

data.nonEmpty == hasRow

case _ => false
}

def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if isRelationWithAllNullKeys(j.right) =>
LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)

case j @ Join(_, _, Inner, _, _) if checkRowCount(j.left, hasRow = false) ||
checkRowCount(j.right, hasRow = false) =>
LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)

case j @ Join(_, _, LeftOuter, _, _) if checkRowCount(j.left, hasRow = false) =>
LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)

case j @ Join(_, _, RightOuter, _, _) if checkRowCount(j.right, hasRow = false) =>
LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)

case j @ Join(_, _, FullOuter, _, _) if checkRowCount(j.left, hasRow = false) &&
checkRowCount(j.right, hasRow = false) =>
LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)

case j @ Join(_, _, LeftSemi, condition, _) =>
if (checkRowCount(j.right, hasRow = false)) {
if (checkRowCount(j.left, hasRow = false) || checkRowCount(j.right, hasRow = false)) {
LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)
} else if (condition.isEmpty && checkRowCount(j.right, hasRow = true)) {
j.left
Expand All @@ -80,7 +101,9 @@ object EliminateUnnecessaryJoin extends Rule[LogicalPlan] {
}

case j @ Join(_, _, LeftAnti, condition, _) =>
if (checkRowCount(j.right, hasRow = false)) {
if (checkRowCount(j.left, hasRow = false)) {
LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)
} else if (checkRowCount(j.right, hasRow = false)) {
j.left
} else if (condition.isEmpty && checkRowCount(j.right, hasRow = true)) {
LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ class AdaptiveQueryExecSuite
test("Empty stage coalesced to 1-partition RDD") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true") {
SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key ->
EliminateUnnecessaryJoin.getClass.getName.stripSuffix("$")) {
val df1 = spark.range(10).withColumn("a", 'id)
val df2 = spark.range(10).withColumn("b", 'id)
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
Expand Down Expand Up @@ -1307,6 +1309,74 @@ class AdaptiveQueryExecSuite
}
}

test("SPARK-35455: Enhance EliminateUnnecessaryJoin - single join") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update the test name and PR title: Unify empty relation optimization between normal and AQE optimizer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it and also updated the PR title.

withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
Seq(
// left semi join and empty left side
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't optimize this before this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we cann't. Before we only check right side with LeftSemi/LeftAnti.

And the test should use different column to do filter and join in case of InferFiltersFromConstraints make right side empty. Updated it.

("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 LEFT SEMI JOIN testData2 t2 ON " +
"t1.key = t2.a", true),
// left anti join and empty left side
("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 LEFT ANTI JOIN testData2 t2 ON " +
"t1.key = t2.a", true),
// left outer join and empty left side
("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 LEFT JOIN testData2 t2 ON " +
"t1.key = t2.a", true),
// left outer join and non-empty left side
("SELECT * FROM testData t1 LEFT JOIN testData2 t2 ON " +
"t1.key = t2.a", false),
// right outer join and empty right side
("SELECT * FROM testData t1 RIGHT JOIN (SELECT * FROM testData2 WHERE b = 0)t2 ON " +
"t1.key = t2.a", true),
// right outer join and non-empty right side
("SELECT * FROM testData t1 RIGHT JOIN testData2 t2 ON " +
"t1.key = t2.a", false),
// full outer join and both side empty
("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 FULL JOIN " +
"(SELECT * FROM testData2 WHERE b = 0)t2 ON t1.key = t2.a", true),
// full outer join and left side empty right side non-empty
("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 FULL JOIN " +
"testData2 t2 ON t1.key = t2.a", false)
).foreach { case (query, isEliminated) =>
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query)
assert(findTopLevelBaseJoin(plan).size == 1)
assert(findTopLevelBaseJoin(adaptivePlan).isEmpty == isEliminated)
}
}
}

test("SPARK-35455: Enhance EliminateUnnecessaryJoin - multi join") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
Seq(
("""
|SELECT * FROM testData t1
| JOIN (SELECT * FROM testData2 WHERE b = 0) t2 ON t1.key = t2.a
| LEFT JOIN testData2 t3 ON t1.key = t3.a
|""".stripMargin, 0),
("""
|SELECT * FROM (SELECT * FROM testData WHERE key = 0) t1
| LEFT ANTI JOIN testData2 t2
| FULL JOIN (SELECT * FROM testData2 WHERE b = 0) t3 ON t1.key = t3.a
|""".stripMargin, 0),
("""
|SELECT * FROM testData t1
| LEFT SEMI JOIN (SELECT * FROM testData2 WHERE b = 0)
| RIGHT JOIN testData2
|""".stripMargin, 1),
("""
|SELECT * FROM testData t1
| FULL JOIN (SELECT * FROM testData2 WHERE b = 0) t1
| FULL JOIN (SELECT * FROM testData WHERE key = 0) t2
|""".stripMargin, 2)
).foreach { case (query, joinNum) =>
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query)
assert(findTopLevelBaseJoin(plan).size == 2)
assert(findTopLevelBaseJoin(adaptivePlan).size == joinNum)
}
}
}

test("SPARK-32753: Only copy tags to node with no tags") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
withTempView("v1") {
Expand Down