From 4514d278ae4e0ae3c64abcb1c7c55cbd0c4181db Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Thu, 20 May 2021 13:42:27 +0800 Subject: [PATCH 01/23] Enhance EliminateUnnecessaryJoin --- .../adaptive/EliminateUnnecessaryJoin.scala | 43 +++++++++--- .../adaptive/AdaptiveQueryExecSuite.scala | 68 +++++++++++++++++++ 2 files changed, 101 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala index fca56988e520..387952f65169 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin -import org.apache.spark.sql.catalyst.plans.{Inner, LeftAnti, LeftSemi} +import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys @@ -31,13 +31,20 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys * 2. Join is inner join, and either side of join is empty. Eliminate join to an empty * [[LocalRelation]]. * - * 3. Join is left semi join - * 3.1. Join right side is empty. Eliminate join to an empty [[LocalRelation]]. - * 3.2. Join right side is non-empty and condition is empty. Eliminate join to its left side. + * 3. Join is outer join. Eliminate join to empty [[LocalRelation]] if: + * 3.1. Left outer if left side is empty + * 3.2. Right outer if left side is empty + * 3.3. Full outer if both side are empty * - * 4. Join is left anti join - * 4.1. Join right side is empty. Eliminate join to its left side. - * 4.2. Join right side is non-empty and condition is empty. Eliminate join to an empty + * 4. Join is left semi join + * 4.1. Join left side is empty. Eliminate join to an empty [[LocalRelation]]. + * 4.2. Join right side is empty. Eliminate join to an empty [[LocalRelation]]. + * 4.3. Join right side is non-empty and condition is empty. Eliminate join to its left side. + * + * 5. Join is left anti join + * 5.1. Join left side is empty. Eliminate join to an empty [[LocalRelation]]. + * 5.2. Join right side is empty. Eliminate join to its left side. + * 5.3. Join right side is non-empty and condition is empty. Eliminate join to an empty * [[LocalRelation]]. * * This applies to all joins (sort merge join, shuffled hash join, broadcast hash join, and @@ -59,10 +66,14 @@ object EliminateUnnecessaryJoin extends Rule[LogicalPlan] { case Some(count) => hasRow == (count > 0) case _ => false } + + case LocalRelation(_, data, isStreaming) if !isStreaming => + data.nonEmpty == hasRow + case _ => false } - def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { + def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if isRelationWithAllNullKeys(j.right) => LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) @@ -70,8 +81,18 @@ object EliminateUnnecessaryJoin extends Rule[LogicalPlan] { checkRowCount(j.right, hasRow = false) => LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + case j @ Join(_, _, LeftOuter, _, _) if checkRowCount(j.left, hasRow = false) => + LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + + case j @ Join(_, _, RightOuter, _, _) if checkRowCount(j.right, hasRow = false) => + LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + + case j @ Join(_, _, FullOuter, _, _) if checkRowCount(j.left, hasRow = false) && + checkRowCount(j.right, hasRow = false) => + LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + case j @ Join(_, _, LeftSemi, condition, _) => - if (checkRowCount(j.right, hasRow = false)) { + if (checkRowCount(j.left, hasRow = false) || checkRowCount(j.right, hasRow = false)) { LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) } else if (condition.isEmpty && checkRowCount(j.right, hasRow = true)) { j.left @@ -80,7 +101,9 @@ object EliminateUnnecessaryJoin extends Rule[LogicalPlan] { } case j @ Join(_, _, LeftAnti, condition, _) => - if (checkRowCount(j.right, hasRow = false)) { + if (checkRowCount(j.left, hasRow = false)) { + LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + } else if (checkRowCount(j.right, hasRow = false)) { j.left } else if (condition.isEmpty && checkRowCount(j.right, hasRow = true)) { LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 454d3aa148a4..0f8367564d76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1307,6 +1307,74 @@ class AdaptiveQueryExecSuite } } + test("SPARK-35455: Enhance EliminateUnnecessaryJoin - single join") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + Seq( + // left semi join and empty left side + ("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 LEFT SEMI JOIN testData2 t2 ON " + + "t1.key = t2.a", true), + // left anti join and empty left side + ("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 LEFT ANTI JOIN testData2 t2 ON " + + "t1.key = t2.a", true), + // left outer join and empty left side + ("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 LEFT JOIN testData2 t2 ON " + + "t1.key = t2.a", true), + // left outer join and non-empty left side + ("SELECT * FROM testData t1 LEFT JOIN testData2 t2 ON " + + "t1.key = t2.a", false), + // right outer join and empty right side + ("SELECT * FROM testData t1 RIGHT JOIN (SELECT * FROM testData2 WHERE b = 0)t2 ON " + + "t1.key = t2.a", true), + // right outer join and non-empty right side + ("SELECT * FROM testData t1 RIGHT JOIN testData2 t2 ON " + + "t1.key = t2.a", false), + // full outer join and both side empty + ("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 FULL JOIN " + + "(SELECT * FROM testData2 WHERE b = 0)t2 ON t1.key = t2.a", true), + // full outer join and left side empty right side non-empty + ("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 FULL JOIN " + + "testData2 t2 ON t1.key = t2.a", false) + ).foreach { case (query, isEliminated) => + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query) + assert(findTopLevelBaseJoin(plan).size == 1) + assert(findTopLevelBaseJoin(adaptivePlan).isEmpty == isEliminated) + } + } + } + + test("SPARK-35455: Enhance EliminateUnnecessaryJoin - multi join") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + Seq( + (""" + |SELECT * FROM testData t1 + | JOIN (SELECT * FROM testData2 WHERE b = 0) t2 ON t1.key = t2.a + | LEFT JOIN testData2 t3 ON t1.key = t3.a + |""".stripMargin, 0), + (""" + |SELECT * FROM (SELECT * FROM testData WHERE key = 0) t1 + | LEFT ANTI JOIN testData2 t2 + | FULL JOIN (SELECT * FROM testData2 WHERE b = 0) t3 ON t1.key = t3.a + |""".stripMargin, 0), + (""" + |SELECT * FROM testData t1 + | LEFT SEMI JOIN (SELECT * FROM testData2 WHERE b = 0) + | RIGHT JOIN testData2 + |""".stripMargin, 1), + (""" + |SELECT * FROM testData t1 + | FULL JOIN (SELECT * FROM testData2 WHERE b = 0) t1 + | FULL JOIN (SELECT * FROM testData WHERE key = 0) t2 + |""".stripMargin, 2) + ).foreach { case (query, joinNum) => + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query) + assert(findTopLevelBaseJoin(plan).size == 2) + assert(findTopLevelBaseJoin(adaptivePlan).size == joinNum) + } + } + } + test("SPARK-32753: Only copy tags to node with no tags") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { withTempView("v1") { From 509724757f59e9b01ccb0774fb6d1cc56e75f9dc Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Thu, 20 May 2021 13:56:44 +0800 Subject: [PATCH 02/23] fix --- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 0f8367564d76..d70bcdcc41fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -236,7 +236,9 @@ class AdaptiveQueryExecSuite test("Empty stage coalesced to 1-partition RDD") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true") { + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> + EliminateUnnecessaryJoin.getClass.getName.stripSuffix("$")) { val df1 = spark.range(10).withColumn("a", 'id) val df2 = spark.range(10).withColumn("b", 'id) withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { From 8df68d93080a700fdca009eedde9d19464522d64 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Thu, 20 May 2021 15:54:31 +0800 Subject: [PATCH 03/23] LocalRelation early --- .../sql/execution/adaptive/AQEOptimizer.scala | 3 +- .../adaptive/ConvertToLocalRelation.scala | 32 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ConvertToLocalRelation.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala index a1c35ad27b81..9ddff4657f76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.adaptive +import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LogicalPlanIntegrity, PlanHelper} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.internal.SQLConf @@ -27,7 +28,7 @@ import org.apache.spark.util.Utils */ class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] { private val defaultBatches = Seq( - Batch("Eliminate Unnecessary Join", Once, EliminateUnnecessaryJoin), + Batch("LocalRelation early", Once, Seq(ConvertToLocalRelation, PropagateEmptyRelation)), Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin) ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ConvertToLocalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ConvertToLocalRelation.scala new file mode 100644 index 000000000000..d6f2634395a4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ConvertToLocalRelation.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule + +/** + * Converts empty query stage to empty `LocalRelation` + */ +object ConvertToLocalRelation extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case l @ LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined && + stage.getRuntimeStatistics.rowCount.contains(0) => + LocalRelation(l.output, Seq.empty) + } +} From 165077b63b99df3ed20634190ae4df8dd7895ea8 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Thu, 20 May 2021 16:36:19 +0800 Subject: [PATCH 04/23] fix --- .../sql/execution/adaptive/AQEOptimizer.scala | 7 ++- .../adaptive/EliminateUnnecessaryJoin.scala | 57 +++---------------- .../sql/DynamicPartitionPruningSuite.scala | 2 +- .../adaptive/AdaptiveQueryExecSuite.scala | 30 ++++------ 4 files changed, 28 insertions(+), 68 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala index 9ddff4657f76..5913eb79d64b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.adaptive +import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LogicalPlanIntegrity, PlanHelper} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -28,7 +29,11 @@ import org.apache.spark.util.Utils */ class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] { private val defaultBatches = Seq( - Batch("LocalRelation early", Once, Seq(ConvertToLocalRelation, PropagateEmptyRelation)), + Batch("LocalRelation early", Once, + ConvertToLocalRelation, + EliminateUnnecessaryJoin, + PropagateEmptyRelation, + UpdateAttributeNullability), Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin) ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala index 387952f65169..eb6d95eb86e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin -import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter} +import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys @@ -28,28 +28,12 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys * 1. Join is single column NULL-aware anti join (NAAJ), and broadcasted [[HashedRelation]] * is [[HashedRelationWithAllNullKeys]]. Eliminate join to an empty [[LocalRelation]]. * - * 2. Join is inner join, and either side of join is empty. Eliminate join to an empty - * [[LocalRelation]]. + * 2. Join is left semi join + * Join right side is non-empty and condition is empty. Eliminate join to its left side. * - * 3. Join is outer join. Eliminate join to empty [[LocalRelation]] if: - * 3.1. Left outer if left side is empty - * 3.2. Right outer if left side is empty - * 3.3. Full outer if both side are empty - * - * 4. Join is left semi join - * 4.1. Join left side is empty. Eliminate join to an empty [[LocalRelation]]. - * 4.2. Join right side is empty. Eliminate join to an empty [[LocalRelation]]. - * 4.3. Join right side is non-empty and condition is empty. Eliminate join to its left side. - * - * 5. Join is left anti join - * 5.1. Join left side is empty. Eliminate join to an empty [[LocalRelation]]. - * 5.2. Join right side is empty. Eliminate join to its left side. - * 5.3. Join right side is non-empty and condition is empty. Eliminate join to an empty + * 3. Join is left anti join + * Join right side is non-empty and condition is empty. Eliminate join to an empty * [[LocalRelation]]. - * - * This applies to all joins (sort merge join, shuffled hash join, broadcast hash join, and - * broadcast nested loop join), because sort merge join and shuffled hash join will be changed - * to broadcast hash join with AQE at the first place. */ object EliminateUnnecessaryJoin extends Rule[LogicalPlan] { @@ -60,16 +44,13 @@ object EliminateUnnecessaryJoin extends Rule[LogicalPlan] { case _ => false } - private def checkRowCount(plan: LogicalPlan, hasRow: Boolean): Boolean = plan match { + private def checkRowCount(plan: LogicalPlan): Boolean = plan match { case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => stage.getRuntimeStatistics.rowCount match { - case Some(count) => hasRow == (count > 0) + case Some(count) => count > 0 case _ => false } - case LocalRelation(_, data, isStreaming) if !isStreaming => - data.nonEmpty == hasRow - case _ => false } @@ -77,35 +58,15 @@ object EliminateUnnecessaryJoin extends Rule[LogicalPlan] { case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if isRelationWithAllNullKeys(j.right) => LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - case j @ Join(_, _, Inner, _, _) if checkRowCount(j.left, hasRow = false) || - checkRowCount(j.right, hasRow = false) => - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - - case j @ Join(_, _, LeftOuter, _, _) if checkRowCount(j.left, hasRow = false) => - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - - case j @ Join(_, _, RightOuter, _, _) if checkRowCount(j.right, hasRow = false) => - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - - case j @ Join(_, _, FullOuter, _, _) if checkRowCount(j.left, hasRow = false) && - checkRowCount(j.right, hasRow = false) => - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - case j @ Join(_, _, LeftSemi, condition, _) => - if (checkRowCount(j.left, hasRow = false) || checkRowCount(j.right, hasRow = false)) { - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - } else if (condition.isEmpty && checkRowCount(j.right, hasRow = true)) { + if (condition.isEmpty && checkRowCount(j.right)) { j.left } else { j } case j @ Join(_, _, LeftAnti, condition, _) => - if (checkRowCount(j.left, hasRow = false)) { - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - } else if (checkRowCount(j.right, hasRow = false)) { - j.left - } else if (condition.isEmpty && checkRowCount(j.right, hasRow = true)) { + if (condition.isEmpty && checkRowCount(j.right)) { LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) } else { j diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 3b88bd58d925..d2c97706a054 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -1382,7 +1382,7 @@ abstract class DynamicPartitionPruningSuiteBase withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", - SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> EliminateUnnecessaryJoin.ruleName) { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> ConvertToLocalRelation.ruleName) { val df = sql( """ |SELECT * FROM fact_sk f diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index d70bcdcc41fc..867e5be6b3d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -237,8 +237,7 @@ class AdaptiveQueryExecSuite withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", - SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> - EliminateUnnecessaryJoin.getClass.getName.stripSuffix("$")) { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> ConvertToLocalRelation.ruleName) { val df1 = spark.range(10).withColumn("a", 'id) val df2 = spark.range(10).withColumn("b", 'id) withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { @@ -1336,11 +1335,11 @@ class AdaptiveQueryExecSuite "(SELECT * FROM testData2 WHERE b = 0)t2 ON t1.key = t2.a", true), // full outer join and left side empty right side non-empty ("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 FULL JOIN " + - "testData2 t2 ON t1.key = t2.a", false) + "testData2 t2 ON t1.key = t2.a", true) ).foreach { case (query, isEliminated) => val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query) assert(findTopLevelBaseJoin(plan).size == 1) - assert(findTopLevelBaseJoin(adaptivePlan).isEmpty == isEliminated) + assert(findTopLevelBaseJoin(adaptivePlan).isEmpty == isEliminated, adaptivePlan) } } } @@ -1349,30 +1348,25 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { Seq( - (""" + """ |SELECT * FROM testData t1 | JOIN (SELECT * FROM testData2 WHERE b = 0) t2 ON t1.key = t2.a | LEFT JOIN testData2 t3 ON t1.key = t3.a - |""".stripMargin, 0), - (""" + |""".stripMargin, + """ |SELECT * FROM (SELECT * FROM testData WHERE key = 0) t1 | LEFT ANTI JOIN testData2 t2 | FULL JOIN (SELECT * FROM testData2 WHERE b = 0) t3 ON t1.key = t3.a - |""".stripMargin, 0), - (""" + |""".stripMargin, + """ |SELECT * FROM testData t1 | LEFT SEMI JOIN (SELECT * FROM testData2 WHERE b = 0) - | RIGHT JOIN testData2 - |""".stripMargin, 1), - (""" - |SELECT * FROM testData t1 - | FULL JOIN (SELECT * FROM testData2 WHERE b = 0) t1 - | FULL JOIN (SELECT * FROM testData WHERE key = 0) t2 - |""".stripMargin, 2) - ).foreach { case (query, joinNum) => + | RIGHT JOIN testData2 t3 on t1.key = t3.a + |""".stripMargin + ).foreach { query => val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query) assert(findTopLevelBaseJoin(plan).size == 2) - assert(findTopLevelBaseJoin(adaptivePlan).size == joinNum) + assert(findTopLevelBaseJoin(adaptivePlan).isEmpty) } } } From 2f5fa200fa16af25bfebcf7d0e42cfbb9c8be42a Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 21 May 2021 12:37:14 +0800 Subject: [PATCH 05/23] split PropagateEmptyRelation --- .../sql/catalyst/optimizer/Optimizer.scala | 6 +- .../optimizer/PropagateEmptyRelation.scala | 87 +++++++++++++++---- .../optimizer/OptimizeLimitZeroSuite.scala | 3 +- .../OptimizerRuleExclusionSuite.scala | 3 +- .../PropagateEmptyRelationSuite.scala | 3 +- .../sql/execution/adaptive/AQEOptimizer.scala | 7 +- .../adaptive/ConvertToLocalRelation.scala | 32 ------- .../adaptive/EliminateUnnecessaryJoin.scala | 75 ---------------- ...PropagateEmptyRelationAdvancedHelper.scala | 48 ++++++++++ .../sql/DynamicPartitionPruningSuite.scala | 3 +- .../adaptive/AdaptiveQueryExecSuite.scala | 6 +- 11 files changed, 134 insertions(+), 139 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ConvertToLocalRelation.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyRelationAdvancedHelper.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 19e9312715db..272575bf13e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -173,7 +173,8 @@ abstract class Optimizer(catalogManager: CatalogManager) // LocalRelation and does not trigger many rules. Batch("LocalRelation early", fixedPoint, ConvertToLocalRelation, - PropagateEmptyRelation, + PropagateEmptyRelationBasic, + PropagateEmptyRelationAdvanced(), // PropagateEmptyRelation can change the nullability of an attribute from nullable to // non-nullable when an empty relation child of a Union is removed UpdateAttributeNullability) :: @@ -221,7 +222,8 @@ abstract class Optimizer(catalogManager: CatalogManager) ReassignLambdaVariableID) :+ Batch("LocalRelation", fixedPoint, ConvertToLocalRelation, - PropagateEmptyRelation, + PropagateEmptyRelationBasic, + PropagateEmptyRelationAdvanced(), // PropagateEmptyRelation can change the nullability of an attribute from nullable to // non-nullable when an empty relation child of a Union is removed UpdateAttributeNullability) :+ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 8455b786bc38..da193c65ced5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral +import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -27,16 +28,12 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_ /** * Collapse plans consisting empty local relations generated by [[PruneFilters]]. - * 1. Binary(or Higher)-node Logical Plans + * 1. Higher-node Logical Plans * - Union with all empty children. - * - Join with one or two empty children (including Intersect/Except). * 2. Unary-node Logical Plans - * - Project/Filter/Sample/Join/Limit/Repartition with all empty children. - * - Join with false condition. - * - Aggregate with all empty children and at least one grouping expression. - * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. + * - Project/Filter/Sample with all empty children. */ -object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper with CastSupport { +object PropagateEmptyRelationBasic extends Rule[LogicalPlan] { private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match { case p: LocalRelation => p.data.isEmpty case _ => false @@ -45,12 +42,8 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit private def empty(plan: LogicalPlan) = LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) - // Construct a project list from plan's output, while the value is always NULL. - private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = - plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) } - - def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( - _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { + override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( + _.containsAnyPattern(LOCAL_RELATION), ruleId) { case p: Union if p.children.exists(isEmptyLocalRelation) => val newChildren = p.children.filterNot(isEmptyLocalRelation) if (newChildren.isEmpty) { @@ -72,11 +65,66 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit } } + case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match { + case _: Project => empty(p) + case _: Filter => empty(p) + case _: Sample => empty(p) + case _ => p + } + } +} + +/** + * The rule used by both normal Optimizer and AQE Optimizer for: + * 1. Binary-node Logical Plans + * - Join with one or two empty children (including Intersect/Except). + * - Join is single column NULL-aware anti join (NAAJ) + * Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. Eliminate join to an + * empty [[LocalRelation]]. + * - Left semi Join + * Right side is non-empty and condition is empty. Eliminate join to its left side. + * - Left anti join + * Right side is non-empty and condition is empty. Eliminate join to an empty + * [[LocalRelation]]. + * 2. Unary-node Logical Plans + * - Limit/Repartition with all empty children. + * - Aggregate with all empty children and at least one grouping expression. + * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. + * + * @param isEmptyPlan At AQE side, we use the query stage stats to check the check. + * @param isRelationWithAllNullKeys At AQE side, we use the broadcast query stage to do the check. + */ +case class PropagateEmptyRelationAdvanced( + isEmptyPlan: Option[LogicalPlan => Boolean] = None, + isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None) + extends Rule[LogicalPlan] with CastSupport { + + private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = { + isEmptyPlan.getOrElse { + case p: LocalRelation => p.data.isEmpty + case _ => false + }.apply(plan) + } + + private def empty(plan: LogicalPlan) = + LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) + + // Construct a project list from plan's output, while the value is always NULL. + private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = + plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) } + + def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( + _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { + + case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) + if isRelationWithAllNullKeys.isDefined && isRelationWithAllNullKeys.get(j.right) => + LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + // Joins on empty LocalRelations generated from streaming sources are not eliminated // as stateful streaming joins need to perform other state management operations other than // just processing the input data. case p @ Join(_, _, joinType, conditionOpt, _) - if !p.children.exists(_.isStreaming) => + if !p.children.exists(_.isStreaming) => val isLeftEmpty = isEmptyLocalRelation(p.left) val isRightEmpty = isEmptyLocalRelation(p.right) val isFalseCondition = conditionOpt match { @@ -103,14 +151,17 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit Project(nullValueProjectList(p.left) ++ p.right.output, p.right) case _ => p } + } else if (joinType == LeftSemi && conditionOpt.isEmpty && + isEmpty.isDefined && !isEmpty.get(p.right)) { + p.left + } else if (joinType == LeftAnti && conditionOpt.isEmpty && + isEmpty.isDefined && !isEmpty.get(p.right)) { + empty(p) } else { p } case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match { - case _: Project => empty(p) - case _: Filter => empty(p) - case _: Sample => empty(p) case _: Sort => empty(p) case _: GlobalLimit if !p.isStreaming => empty(p) case _: LocalLimit if !p.isStreaming => empty(p) @@ -136,4 +187,4 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit case _ => p } } -} +} \ No newline at end of file diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala index c8c1ecd7718b..417118a66b63 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala @@ -33,7 +33,8 @@ class OptimizeLimitZeroSuite extends PlanTest { Batch("OptimizeLimitZero", Once, ReplaceIntersectWithSemiJoin, OptimizeLimitZero, - PropagateEmptyRelation) :: Nil + PropagateEmptyRelationBasic, + PropagateEmptyRelationAdvanced()) :: Nil } val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala index a277a2d339e9..b397be4248d3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala @@ -118,7 +118,8 @@ class OptimizerRuleExclusionSuite extends PlanTest { test("Verify optimized plan after excluding CombineUnions rule") { val excludedRules = Seq( ConvertToLocalRelation.ruleName, - PropagateEmptyRelation.ruleName, + PropagateEmptyRelationBasic.ruleName, + PropagateEmptyRelationAdvanced().ruleName, CombineUnions.ruleName) val testRelation1 = LocalRelation('a.int, 'b.int, 'c.int) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index b5dcb8aa6764..65d4446a778b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -38,7 +38,8 @@ class PropagateEmptyRelationSuite extends PlanTest { ReplaceIntersectWithSemiJoin, PushPredicateThroughNonJoin, PruneFilters, - PropagateEmptyRelation, + PropagateEmptyRelationBasic, + PropagateEmptyRelationAdvanced(), CollapseProject) :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala index 5913eb79d64b..dff184b68262 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability -import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LogicalPlanIntegrity, PlanHelper} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.internal.SQLConf @@ -29,10 +28,8 @@ import org.apache.spark.util.Utils */ class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] { private val defaultBatches = Seq( - Batch("LocalRelation early", Once, - ConvertToLocalRelation, - EliminateUnnecessaryJoin, - PropagateEmptyRelation, + Batch("Propagate Empty LocalRelation", Once, + PropagateEmptyRelationAdvancedHelper.propagateEmptyRelationAdvanced, UpdateAttributeNullability), Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin) ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ConvertToLocalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ConvertToLocalRelation.scala deleted file mode 100644 index d6f2634395a4..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ConvertToLocalRelation.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.Rule - -/** - * Converts empty query stage to empty `LocalRelation` - */ -object ConvertToLocalRelation extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case l @ LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined && - stage.getRuntimeStatistics.rowCount.contains(0) => - LocalRelation(l.output, Seq.empty) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala deleted file mode 100644 index eb6d95eb86e7..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin -import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} -import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys - -/** - * This optimization rule detects and eliminates unnecessary Join: - * 1. Join is single column NULL-aware anti join (NAAJ), and broadcasted [[HashedRelation]] - * is [[HashedRelationWithAllNullKeys]]. Eliminate join to an empty [[LocalRelation]]. - * - * 2. Join is left semi join - * Join right side is non-empty and condition is empty. Eliminate join to its left side. - * - * 3. Join is left anti join - * Join right side is non-empty and condition is empty. Eliminate join to an empty - * [[LocalRelation]]. - */ -object EliminateUnnecessaryJoin extends Rule[LogicalPlan] { - - private def isRelationWithAllNullKeys(plan: LogicalPlan) = plan match { - case LogicalQueryStage(_, stage: BroadcastQueryStageExec) - if stage.resultOption.get().isDefined => - stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys - case _ => false - } - - private def checkRowCount(plan: LogicalPlan): Boolean = plan match { - case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => - stage.getRuntimeStatistics.rowCount match { - case Some(count) => count > 0 - case _ => false - } - - case _ => false - } - - def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { - case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if isRelationWithAllNullKeys(j.right) => - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - - case j @ Join(_, _, LeftSemi, condition, _) => - if (condition.isEmpty && checkRowCount(j.right)) { - j.left - } else { - j - } - - case j @ Join(_, _, LeftAnti, condition, _) => - if (condition.isEmpty && checkRowCount(j.right)) { - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - } else { - j - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyRelationAdvancedHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyRelationAdvancedHelper.scala new file mode 100644 index 000000000000..3fa2d5b3098f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyRelationAdvancedHelper.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationAdvanced +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys + +/** + * A helper class to provide a AQE side `PropagateEmptyRelationAdvanced` rule. + */ +object PropagateEmptyRelationAdvancedHelper { + + private def isRelationWithAllNullKeys(plan: LogicalPlan) = plan match { + case LogicalQueryStage(_, stage: BroadcastQueryStageExec) + if stage.resultOption.get().isDefined => + stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys + case _ => false + } + + private def checkRowCount(plan: LogicalPlan): Boolean = plan match { + case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => + stage.getRuntimeStatistics.rowCount match { + case Some(count) => count == 0 + case _ => false + } + case _ => false + } + + lazy val propagateEmptyRelationAdvanced: PropagateEmptyRelationAdvanced = { + PropagateEmptyRelationAdvanced(Some(checkRowCount), Some(isRelationWithAllNullKeys)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index d2c97706a054..5edf5dc6d4f0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.GivenWhenThen import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ +import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationAdvanced import org.apache.spark.sql.catalyst.plans.ExistenceJoin import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ @@ -1382,7 +1383,7 @@ abstract class DynamicPartitionPruningSuiteBase withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", - SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> ConvertToLocalRelation.ruleName) { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> PropagateEmptyRelationAdvanced().ruleName) { val df = sql( """ |SELECT * FROM fact_sk f diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 867e5be6b3d1..a42714f275e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, PropagateEmptyRelationAdvanced} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.command.DataWritingCommandExec @@ -237,7 +237,7 @@ class AdaptiveQueryExecSuite withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", - SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> ConvertToLocalRelation.ruleName) { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> PropagateEmptyRelationAdvanced().ruleName) { val df1 = spark.range(10).withColumn("a", 'id) val df2 = spark.range(10).withColumn("b", 'id) withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { @@ -1234,7 +1234,7 @@ class AdaptiveQueryExecSuite SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString, // This test is a copy of test(SPARK-32573), in order to test the configuration // `spark.sql.adaptive.optimizer.excludedRules` works as expect. - SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> EliminateUnnecessaryJoin.ruleName) { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> PropagateEmptyRelationAdvanced().ruleName) { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)") val bhj = findTopLevelBroadcastHashJoin(plan) From 7b80db02cc0a07a48aa378e03658d5c6024459ba Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 21 May 2021 14:38:16 +0800 Subject: [PATCH 06/23] fix --- .../optimizer/PropagateEmptyRelation.scala | 36 +++++++++++-------- .../sql/catalyst/rules/RuleIdCollection.scala | 2 +- ...PropagateEmptyRelationAdvancedHelper.scala | 4 +-- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index da193c65ced5..3f8301cdb0cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJo import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_FALSE_LITERAL} +import org.apache.spark.sql.catalyst.trees.TreePattern.LOCAL_RELATION /** * Collapse plans consisting empty local relations generated by [[PruneFilters]]. @@ -87,23 +87,29 @@ object PropagateEmptyRelationBasic extends Rule[LogicalPlan] { * Right side is non-empty and condition is empty. Eliminate join to an empty * [[LocalRelation]]. * 2. Unary-node Logical Plans - * - Limit/Repartition with all empty children. - * - Aggregate with all empty children and at least one grouping expression. - * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. + * - Limit/Repartition with all empty children. + * - Aggregate with all empty children and at least one grouping expression. + * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. * - * @param isEmptyPlan At AQE side, we use the query stage stats to check the check. + * @param checkRowCount At AQE side, we use the query stage stats to check the check. * @param isRelationWithAllNullKeys At AQE side, we use the broadcast query stage to do the check. */ case class PropagateEmptyRelationAdvanced( - isEmptyPlan: Option[LogicalPlan => Boolean] = None, + checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None, isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None) extends Rule[LogicalPlan] with CastSupport { private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = { - isEmptyPlan.getOrElse { + val defaultEmptyRelation: Boolean = plan match { case p: LocalRelation => p.data.isEmpty case _ => false - }.apply(plan) + } + + if (checkRowCount.isDefined) { + checkRowCount.get.apply(plan, false) || defaultEmptyRelation + } else { + defaultEmptyRelation + } } private def empty(plan: LogicalPlan) = @@ -113,12 +119,12 @@ case class PropagateEmptyRelationAdvanced( private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) } - def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( - _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { - + // We can not use transformUpWithPruning here since this rule is used by both normal Optimizer + // and AQE Optimizer. And this may only effective at AQE side. + def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if isRelationWithAllNullKeys.isDefined && isRelationWithAllNullKeys.get(j.right) => - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + empty(j) // Joins on empty LocalRelations generated from streaming sources are not eliminated // as stateful streaming joins need to perform other state management operations other than @@ -152,10 +158,10 @@ case class PropagateEmptyRelationAdvanced( case _ => p } } else if (joinType == LeftSemi && conditionOpt.isEmpty && - isEmpty.isDefined && !isEmpty.get(p.right)) { + checkRowCount.isDefined && checkRowCount.get.apply(p.right, true)) { p.left } else if (joinType == LeftAnti && conditionOpt.isEmpty && - isEmpty.isDefined && !isEmpty.get(p.right)) { + checkRowCount.isDefined && checkRowCount.get.apply(p.right, true)) { empty(p) } else { p @@ -187,4 +193,4 @@ case class PropagateEmptyRelationAdvanced( case _ => p } } -} \ No newline at end of file +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index 605b57e46fc1..8b0908f0e1db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -117,7 +117,7 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.optimizer.OptimizeRepartition" :: "org.apache.spark.sql.catalyst.optimizer.OptimizeWindowFunctions" :: "org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields":: - "org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation" :: + "org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBasic" :: "org.apache.spark.sql.catalyst.optimizer.PruneFilters" :: "org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin" :: "org.apache.spark.sql.catalyst.optimizer.PushExtraPredicateThroughJoin" :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyRelationAdvancedHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyRelationAdvancedHelper.scala index 3fa2d5b3098f..a7bac5a1999a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyRelationAdvancedHelper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyRelationAdvancedHelper.scala @@ -33,10 +33,10 @@ object PropagateEmptyRelationAdvancedHelper { case _ => false } - private def checkRowCount(plan: LogicalPlan): Boolean = plan match { + private def checkRowCount(plan: LogicalPlan, hasRow: Boolean): Boolean = plan match { case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => stage.getRuntimeStatistics.rowCount match { - case Some(count) => count == 0 + case Some(count) => hasRow == (count > 0) case _ => false } case _ => false From 05e074c0e841b5c84aa17f3420fcd78f3b5fbfb1 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 21 May 2021 16:41:44 +0800 Subject: [PATCH 07/23] split PropagateEmptyRelationBase --- .../sql/catalyst/optimizer/Optimizer.scala | 4 +- .../optimizer/PropagateEmptyRelation.scala | 24 +++++++----- .../sql/catalyst/rules/RuleIdCollection.scala | 2 + .../optimizer/OptimizeLimitZeroSuite.scala | 2 +- .../OptimizerRuleExclusionSuite.scala | 2 +- .../PropagateEmptyRelationSuite.scala | 2 +- .../sql/execution/adaptive/AQEOptimizer.scala | 2 +- ....scala => AQEPropagateEmptyRelation.scala} | 38 ++++++++++--------- .../sql/DynamicPartitionPruningSuite.scala | 4 +- .../adaptive/AdaptiveQueryExecSuite.scala | 6 +-- 10 files changed, 48 insertions(+), 38 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/{PropagateEmptyRelationAdvancedHelper.scala => AQEPropagateEmptyRelation.scala} (54%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 272575bf13e3..d3b73344152c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -174,7 +174,7 @@ abstract class Optimizer(catalogManager: CatalogManager) Batch("LocalRelation early", fixedPoint, ConvertToLocalRelation, PropagateEmptyRelationBasic, - PropagateEmptyRelationAdvanced(), + PropagateEmptyRelation, // PropagateEmptyRelation can change the nullability of an attribute from nullable to // non-nullable when an empty relation child of a Union is removed UpdateAttributeNullability) :: @@ -223,7 +223,7 @@ abstract class Optimizer(catalogManager: CatalogManager) Batch("LocalRelation", fixedPoint, ConvertToLocalRelation, PropagateEmptyRelationBasic, - PropagateEmptyRelationAdvanced(), + PropagateEmptyRelation, // PropagateEmptyRelation can change the nullability of an attribute from nullable to // non-nullable when an empty relation child of a Union is removed UpdateAttributeNullability) :+ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 3f8301cdb0cb..57d7b45a06d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJo import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.catalyst.trees.TreePattern.LOCAL_RELATION +import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_FALSE_LITERAL} /** * Collapse plans consisting empty local relations generated by [[PruneFilters]]. @@ -90,14 +90,17 @@ object PropagateEmptyRelationBasic extends Rule[LogicalPlan] { * - Limit/Repartition with all empty children. * - Aggregate with all empty children and at least one grouping expression. * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. - * - * @param checkRowCount At AQE side, we use the query stage stats to check the check. - * @param isRelationWithAllNullKeys At AQE side, we use the broadcast query stage to do the check. */ -case class PropagateEmptyRelationAdvanced( - checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None, - isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None) - extends Rule[LogicalPlan] with CastSupport { +trait PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport { + /** + * At AQE side, we use this function to check if a plan has output rows or not + */ + protected def checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None + + /** + * At AQE side, we use the broadcast query stage to do the check + */ + protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = { val defaultEmptyRelation: Boolean = plan match { @@ -121,7 +124,8 @@ case class PropagateEmptyRelationAdvanced( // We can not use transformUpWithPruning here since this rule is used by both normal Optimizer // and AQE Optimizer. And this may only effective at AQE side. - def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { + def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( + _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if isRelationWithAllNullKeys.isDefined && isRelationWithAllNullKeys.get(j.right) => empty(j) @@ -194,3 +198,5 @@ case class PropagateEmptyRelationAdvanced( } } } + +object PropagateEmptyRelation extends PropagateEmptyRelationBase diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index 8b0908f0e1db..6e1b58f675b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -118,6 +118,8 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.optimizer.OptimizeWindowFunctions" :: "org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields":: "org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBasic" :: + "org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation" :: + "org.apache.spark.sql.catalyst.optimizer.AQEPropagateEmptyRelation" :: "org.apache.spark.sql.catalyst.optimizer.PruneFilters" :: "org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin" :: "org.apache.spark.sql.catalyst.optimizer.PushExtraPredicateThroughJoin" :: diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala index 417118a66b63..24c53c98894e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala @@ -34,7 +34,7 @@ class OptimizeLimitZeroSuite extends PlanTest { ReplaceIntersectWithSemiJoin, OptimizeLimitZero, PropagateEmptyRelationBasic, - PropagateEmptyRelationAdvanced()) :: Nil + PropagateEmptyRelation) :: Nil } val testRelation1 = LocalRelation.fromExternalRows(Seq('a.int), data = Seq(Row(1))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala index b397be4248d3..e7fa574f524d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala @@ -119,7 +119,7 @@ class OptimizerRuleExclusionSuite extends PlanTest { val excludedRules = Seq( ConvertToLocalRelation.ruleName, PropagateEmptyRelationBasic.ruleName, - PropagateEmptyRelationAdvanced().ruleName, + PropagateEmptyRelation.ruleName, CombineUnions.ruleName) val testRelation1 = LocalRelation('a.int, 'b.int, 'c.int) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index 65d4446a778b..d81c8cd1cb41 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -39,7 +39,7 @@ class PropagateEmptyRelationSuite extends PlanTest { PushPredicateThroughNonJoin, PruneFilters, PropagateEmptyRelationBasic, - PropagateEmptyRelationAdvanced(), + PropagateEmptyRelation, CollapseProject) :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala index dff184b68262..7ea543089808 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala @@ -29,7 +29,7 @@ import org.apache.spark.util.Utils class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] { private val defaultBatches = Seq( Batch("Propagate Empty LocalRelation", Once, - PropagateEmptyRelationAdvancedHelper.propagateEmptyRelationAdvanced, + AQEPropagateEmptyRelation, UpdateAttributeNullability), Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin) ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyRelationAdvancedHelper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala similarity index 54% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyRelationAdvancedHelper.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index a7bac5a1999a..099a2a1ea0ea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PropagateEmptyRelationAdvancedHelper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -17,32 +17,34 @@ package org.apache.spark.sql.execution.adaptive -import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationAdvanced +import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys /** - * A helper class to provide a AQE side `PropagateEmptyRelationAdvanced` rule. + * Rule [[PropagateEmptyRelation]] at AQE side. */ -object PropagateEmptyRelationAdvancedHelper { +object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { - private def isRelationWithAllNullKeys(plan: LogicalPlan) = plan match { - case LogicalQueryStage(_, stage: BroadcastQueryStageExec) - if stage.resultOption.get().isDefined => - stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys - case _ => false - } - - private def checkRowCount(plan: LogicalPlan, hasRow: Boolean): Boolean = plan match { - case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => - stage.getRuntimeStatistics.rowCount match { - case Some(count) => hasRow == (count > 0) + override protected def checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = { + case (plan, hasRow) => + Some(plan match { + case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => + stage.getRuntimeStatistics.rowCount match { + case Some(count) => hasRow == (count > 0) + case _ => false + } case _ => false - } - case _ => false + }) } - lazy val propagateEmptyRelationAdvanced: PropagateEmptyRelationAdvanced = { - PropagateEmptyRelationAdvanced(Some(checkRowCount), Some(isRelationWithAllNullKeys)) + override protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = { + case plan => + Some(plan match { + case LogicalQueryStage(_, stage: BroadcastQueryStageExec) + if stage.resultOption.get().isDefined => + stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys + case _ => false + }) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 5edf5dc6d4f0..36234e6e5c99 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.GivenWhenThen import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ -import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationAdvanced +import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation import org.apache.spark.sql.catalyst.plans.ExistenceJoin import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ @@ -1383,7 +1383,7 @@ abstract class DynamicPartitionPruningSuiteBase withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", - SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> PropagateEmptyRelationAdvanced().ruleName) { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> PropagateEmptyRelation.ruleName) { val df = sql( """ |SELECT * FROM fact_sk f diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index a42714f275e8..549ec352027f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} -import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, PropagateEmptyRelationAdvanced} +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.command.DataWritingCommandExec @@ -237,7 +237,7 @@ class AdaptiveQueryExecSuite withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", - SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> PropagateEmptyRelationAdvanced().ruleName) { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) { val df1 = spark.range(10).withColumn("a", 'id) val df2 = spark.range(10).withColumn("b", 'id) withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { @@ -1234,7 +1234,7 @@ class AdaptiveQueryExecSuite SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString, // This test is a copy of test(SPARK-32573), in order to test the configuration // `spark.sql.adaptive.optimizer.excludedRules` works as expect. - SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> PropagateEmptyRelationAdvanced().ruleName) { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)") val bhj = findTopLevelBroadcastHashJoin(plan) From 48dd92a741ab87c50e7907d9169148994bf62a3a Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Fri, 21 May 2021 18:28:07 +0800 Subject: [PATCH 08/23] fix --- .../optimizer/PropagateEmptyRelation.scala | 33 ++++++++------- .../sql/catalyst/rules/RuleIdCollection.scala | 1 - .../adaptive/AQEPropagateEmptyRelation.scala | 41 ++++++++++--------- 3 files changed, 38 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 57d7b45a06d3..ef5a90422bd9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_FALSE_LITERAL} @@ -91,16 +91,16 @@ object PropagateEmptyRelationBasic extends Rule[LogicalPlan] { * - Aggregate with all empty children and at least one grouping expression. * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. */ -trait PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport { +abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport { /** * At AQE side, we use this function to check if a plan has output rows or not */ - protected def checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None + protected def checkRowCount(plan: LogicalPlan, hasRow: Boolean): Option[Boolean] = None /** * At AQE side, we use the broadcast query stage to do the check */ - protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None + protected def isRelationWithAllNullKeys(plan: LogicalPlan): Option[Boolean] = None private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = { val defaultEmptyRelation: Boolean = plan match { @@ -108,11 +108,8 @@ trait PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport { case _ => false } - if (checkRowCount.isDefined) { - checkRowCount.get.apply(plan, false) || defaultEmptyRelation - } else { - defaultEmptyRelation - } + checkRowCount(plan, false).map(_ || defaultEmptyRelation) + .getOrElse(defaultEmptyRelation) } private def empty(plan: LogicalPlan) = @@ -122,12 +119,9 @@ trait PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport { private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) } - // We can not use transformUpWithPruning here since this rule is used by both normal Optimizer - // and AQE Optimizer. And this may only effective at AQE side. - def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( - _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { + protected def applyInternal: PartialFunction[LogicalPlan, LogicalPlan] = { case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) - if isRelationWithAllNullKeys.isDefined && isRelationWithAllNullKeys.get(j.right) => + if isRelationWithAllNullKeys(j.right).contains(true) => empty(j) // Joins on empty LocalRelations generated from streaming sources are not eliminated @@ -162,10 +156,10 @@ trait PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport { case _ => p } } else if (joinType == LeftSemi && conditionOpt.isEmpty && - checkRowCount.isDefined && checkRowCount.get.apply(p.right, true)) { + checkRowCount(p.right, true).contains(true)) { p.left } else if (joinType == LeftAnti && conditionOpt.isEmpty && - checkRowCount.isDefined && checkRowCount.get.apply(p.right, true)) { + checkRowCount(p.right, true).contains(true)) { empty(p) } else { p @@ -199,4 +193,9 @@ trait PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport { } } -object PropagateEmptyRelation extends PropagateEmptyRelationBase +object PropagateEmptyRelation extends PropagateEmptyRelationBase { + def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( + _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { + applyInternal + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index 6e1b58f675b8..3622f2533186 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -119,7 +119,6 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields":: "org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBasic" :: "org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation" :: - "org.apache.spark.sql.catalyst.optimizer.AQEPropagateEmptyRelation" :: "org.apache.spark.sql.catalyst.optimizer.PruneFilters" :: "org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin" :: "org.apache.spark.sql.catalyst.optimizer.PushExtraPredicateThroughJoin" :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 099a2a1ea0ea..4c8f5d3b066c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -22,29 +22,32 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys /** - * Rule [[PropagateEmptyRelation]] at AQE side. + * Rule [[PropagateEmptyRelationBase]] at AQE side. */ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { - override protected def checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = { - case (plan, hasRow) => - Some(plan match { - case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => - stage.getRuntimeStatistics.rowCount match { - case Some(count) => hasRow == (count > 0) - case _ => false - } - case _ => false - }) + override protected def checkRowCount(plan: LogicalPlan, hasRow: Boolean): Option[Boolean] = { + Some(plan match { + case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => + stage.getRuntimeStatistics.rowCount match { + case Some(count) => hasRow == (count > 0) + case _ => false + } + case _ => false + }) } - override protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = { - case plan => - Some(plan match { - case LogicalQueryStage(_, stage: BroadcastQueryStageExec) - if stage.resultOption.get().isDefined => - stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys - case _ => false - }) + override protected def isRelationWithAllNullKeys(plan: LogicalPlan): Option[Boolean] = { + Some(plan match { + case LogicalQueryStage(_, stage: BroadcastQueryStageExec) + if stage.resultOption.get().isDefined => + stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys + case _ => false + }) + } + + // TODO we need use transformUpWithPruning instead of transformUp + def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { + applyInternal } } From 8f4dc8047f90067e0580f0743d329ec18d25e3bd Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Sun, 23 May 2021 22:04:57 +0800 Subject: [PATCH 09/23] merge --- .../sql/catalyst/optimizer/Optimizer.scala | 2 - .../optimizer/PropagateEmptyRelation.scala | 129 +++++++++--------- .../optimizer/OptimizeLimitZeroSuite.scala | 1 - .../OptimizerRuleExclusionSuite.scala | 1 - .../PropagateEmptyRelationSuite.scala | 1 - .../adaptive/AQEPropagateEmptyRelation.scala | 26 ++-- .../sql/DynamicPartitionPruningSuite.scala | 3 +- 7 files changed, 77 insertions(+), 86 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d3b73344152c..19e9312715db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -173,7 +173,6 @@ abstract class Optimizer(catalogManager: CatalogManager) // LocalRelation and does not trigger many rules. Batch("LocalRelation early", fixedPoint, ConvertToLocalRelation, - PropagateEmptyRelationBasic, PropagateEmptyRelation, // PropagateEmptyRelation can change the nullability of an attribute from nullable to // non-nullable when an empty relation child of a Union is removed @@ -222,7 +221,6 @@ abstract class Optimizer(catalogManager: CatalogManager) ReassignLambdaVariableID) :+ Batch("LocalRelation", fixedPoint, ConvertToLocalRelation, - PropagateEmptyRelationBasic, PropagateEmptyRelation, // PropagateEmptyRelation can change the nullability of an attribute from nullable to // non-nullable when an empty relation child of a Union is removed diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index ef5a90422bd9..883e406fb69a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -26,54 +26,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_FALSE_LITERAL} -/** - * Collapse plans consisting empty local relations generated by [[PruneFilters]]. - * 1. Higher-node Logical Plans - * - Union with all empty children. - * 2. Unary-node Logical Plans - * - Project/Filter/Sample with all empty children. - */ -object PropagateEmptyRelationBasic extends Rule[LogicalPlan] { - private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match { - case p: LocalRelation => p.data.isEmpty - case _ => false - } - - private def empty(plan: LogicalPlan) = - LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) - - override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( - _.containsAnyPattern(LOCAL_RELATION), ruleId) { - case p: Union if p.children.exists(isEmptyLocalRelation) => - val newChildren = p.children.filterNot(isEmptyLocalRelation) - if (newChildren.isEmpty) { - empty(p) - } else { - val newPlan = if (newChildren.size > 1) Union(newChildren) else newChildren.head - val outputs = newPlan.output.zip(p.output) - // the original Union may produce different output attributes than the new one so we alias - // them if needed - if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) { - newPlan - } else { - val outputAliases = outputs.map { case (newAttr, oldAttr) => - val newExplicitMetadata = - if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None - Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata) - } - Project(outputAliases, newPlan) - } - } - - case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match { - case _: Project => empty(p) - case _: Filter => empty(p) - case _: Sample => empty(p) - case _ => p - } - } -} - /** * The rule used by both normal Optimizer and AQE Optimizer for: * 1. Binary-node Logical Plans @@ -95,33 +47,37 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup /** * At AQE side, we use this function to check if a plan has output rows or not */ - protected def checkRowCount(plan: LogicalPlan, hasRow: Boolean): Option[Boolean] = None + protected def checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None /** * At AQE side, we use the broadcast query stage to do the check */ - protected def isRelationWithAllNullKeys(plan: LogicalPlan): Option[Boolean] = None + protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None - private def isEmptyLocalRelation(plan: LogicalPlan): Boolean = { - val defaultEmptyRelation: Boolean = plan match { - case p: LocalRelation => p.data.isEmpty - case _ => false - } + protected def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match { + case p: LocalRelation => p.data.isEmpty + case _ => false + } - checkRowCount(plan, false).map(_ || defaultEmptyRelation) - .getOrElse(defaultEmptyRelation) + private def isEmptyLocalRelationWithRowCount(plan: LogicalPlan): Boolean = { + val defaultEmptyRelation: Boolean = isEmptyLocalRelation(plan) + if (checkRowCount.isDefined) { + checkRowCount.get.apply(plan, false) || defaultEmptyRelation + } else { + defaultEmptyRelation + } } - private def empty(plan: LogicalPlan) = + protected def empty(plan: LogicalPlan): LocalRelation = LocalRelation(plan.output, data = Seq.empty, isStreaming = plan.isStreaming) // Construct a project list from plan's output, while the value is always NULL. private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) } - protected def applyInternal: PartialFunction[LogicalPlan, LogicalPlan] = { + protected def propagateEmptyRelationAdvanced: PartialFunction[LogicalPlan, LogicalPlan] = { case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) - if isRelationWithAllNullKeys(j.right).contains(true) => + if isRelationWithAllNullKeys.isDefined && isRelationWithAllNullKeys.get.apply(j.right) => empty(j) // Joins on empty LocalRelations generated from streaming sources are not eliminated @@ -129,8 +85,8 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup // just processing the input data. case p @ Join(_, _, joinType, conditionOpt, _) if !p.children.exists(_.isStreaming) => - val isLeftEmpty = isEmptyLocalRelation(p.left) - val isRightEmpty = isEmptyLocalRelation(p.right) + val isLeftEmpty = isEmptyLocalRelationWithRowCount(p.left) + val isRightEmpty = isEmptyLocalRelationWithRowCount(p.right) val isFalseCondition = conditionOpt match { case Some(FalseLiteral) => true case _ => false @@ -156,16 +112,17 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup case _ => p } } else if (joinType == LeftSemi && conditionOpt.isEmpty && - checkRowCount(p.right, true).contains(true)) { + checkRowCount.isDefined && checkRowCount.get.apply(p.right, true)) { p.left } else if (joinType == LeftAnti && conditionOpt.isEmpty && - checkRowCount(p.right, true).contains(true)) { + checkRowCount.isDefined && checkRowCount.get.apply(p.right, true)) { empty(p) } else { p } - case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match { + case p: UnaryNode + if p.children.nonEmpty && p.children.forall(isEmptyLocalRelationWithRowCount) => p match { case _: Sort => empty(p) case _: GlobalLimit if !p.isStreaming => empty(p) case _: LocalLimit if !p.isStreaming => empty(p) @@ -193,9 +150,47 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup } } +/** + * Rule [[PropagateEmptyRelationBase]] at normal optimizer side. + * With the extra optimal pattern: + * 1. Higher-node Logical Plans + * - Union with all empty children. + * 2. Unary-node Logical Plans + * - Project/Filter/Sample with all empty children. + */ object PropagateEmptyRelation extends PropagateEmptyRelationBase { - def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( + private def propagateEmptyRelationBasic: PartialFunction[LogicalPlan, LogicalPlan] = { + case p: Union if p.children.exists(isEmptyLocalRelation) => + val newChildren = p.children.filterNot(isEmptyLocalRelation) + if (newChildren.isEmpty) { + empty(p) + } else { + val newPlan = if (newChildren.size > 1) Union(newChildren) else newChildren.head + val outputs = newPlan.output.zip(p.output) + // the original Union may produce different output attributes than the new one so we alias + // them if needed + if (outputs.forall { case (newAttr, oldAttr) => newAttr.exprId == oldAttr.exprId }) { + newPlan + } else { + val outputAliases = outputs.map { case (newAttr, oldAttr) => + val newExplicitMetadata = + if (oldAttr.metadata != newAttr.metadata) Some(oldAttr.metadata) else None + Alias(newAttr, oldAttr.name)(oldAttr.exprId, explicitMetadata = newExplicitMetadata) + } + Project(outputAliases, newPlan) + } + } + + case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match { + case _: Project => empty(p) + case _: Filter => empty(p) + case _: Sample => empty(p) + case _ => p + } + } + + override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { - applyInternal + propagateEmptyRelationBasic.orElse(propagateEmptyRelationAdvanced) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala index 24c53c98894e..c8c1ecd7718b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeLimitZeroSuite.scala @@ -33,7 +33,6 @@ class OptimizeLimitZeroSuite extends PlanTest { Batch("OptimizeLimitZero", Once, ReplaceIntersectWithSemiJoin, OptimizeLimitZero, - PropagateEmptyRelationBasic, PropagateEmptyRelation) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala index e7fa574f524d..a277a2d339e9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala @@ -118,7 +118,6 @@ class OptimizerRuleExclusionSuite extends PlanTest { test("Verify optimized plan after excluding CombineUnions rule") { val excludedRules = Seq( ConvertToLocalRelation.ruleName, - PropagateEmptyRelationBasic.ruleName, PropagateEmptyRelation.ruleName, CombineUnions.ruleName) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index d81c8cd1cb41..b5dcb8aa6764 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -38,7 +38,6 @@ class PropagateEmptyRelationSuite extends PlanTest { ReplaceIntersectWithSemiJoin, PushPredicateThroughNonJoin, PruneFilters, - PropagateEmptyRelationBasic, PropagateEmptyRelation, CollapseProject) :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 4c8f5d3b066c..8aca5416cbf4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -22,23 +22,25 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys /** - * Rule [[PropagateEmptyRelationBase]] at AQE side. + * Rule [[PropagateEmptyRelationBase]] at AQE optimizer side. */ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { - override protected def checkRowCount(plan: LogicalPlan, hasRow: Boolean): Option[Boolean] = { - Some(plan match { - case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => - stage.getRuntimeStatistics.rowCount match { - case Some(count) => hasRow == (count > 0) - case _ => false - } - case _ => false + override protected def checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = { + Some((plan, hasRow) => { + plan match { + case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => + stage.getRuntimeStatistics.rowCount match { + case Some(count) => hasRow == (count > 0) + case _ => false + } + case _ => false + } }) } - override protected def isRelationWithAllNullKeys(plan: LogicalPlan): Option[Boolean] = { - Some(plan match { + override protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = { + Some({ case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined => stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys @@ -48,6 +50,6 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { // TODO we need use transformUpWithPruning instead of transformUp def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { - applyInternal + propagateEmptyRelationAdvanced } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index 36234e6e5c99..f5ed511c5906 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -21,7 +21,6 @@ import org.scalatest.GivenWhenThen import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, Expression} import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._ -import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation import org.apache.spark.sql.catalyst.plans.ExistenceJoin import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive._ @@ -1383,7 +1382,7 @@ abstract class DynamicPartitionPruningSuiteBase withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", - SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> PropagateEmptyRelation.ruleName) { + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) { val df = sql( """ |SELECT * FROM fact_sk f From 12200876b863f10b428741e0ac69ee76d5802b1d Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Sun, 23 May 2021 22:08:57 +0800 Subject: [PATCH 10/23] nit --- .../spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala | 2 +- .../org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 883e406fb69a..b3af7c93ca33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_FALSE_LITERAL} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index 3622f2533186..605b57e46fc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -117,7 +117,6 @@ object RuleIdCollection { "org.apache.spark.sql.catalyst.optimizer.OptimizeRepartition" :: "org.apache.spark.sql.catalyst.optimizer.OptimizeWindowFunctions" :: "org.apache.spark.sql.catalyst.optimizer.OptimizeUpdateFields":: - "org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBasic" :: "org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelation" :: "org.apache.spark.sql.catalyst.optimizer.PruneFilters" :: "org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin" :: From e26df96b72806ace84c72e1f134278e2b20b4457 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 24 May 2021 09:15:43 +0800 Subject: [PATCH 11/23] fix --- .../spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index b3af7c93ca33..69025509c61a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -191,6 +191,7 @@ object PropagateEmptyRelation extends PropagateEmptyRelationBase { override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { - propagateEmptyRelationBasic.orElse(propagateEmptyRelationAdvanced) + // andThen instead of orElse, because there exists some same pattern, like UnaryNode. + propagateEmptyRelationBasic.andThen(propagateEmptyRelationAdvanced) } } From f7a14cf49ac27336195dbcb6285b2b2e5f96df38 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 24 May 2021 10:26:15 +0800 Subject: [PATCH 12/23] fix --- .../optimizer/PropagateEmptyRelation.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 69025509c61a..2a3f83c72580 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -181,17 +181,21 @@ object PropagateEmptyRelation extends PropagateEmptyRelationBase { } } - case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) => p match { - case _: Project => empty(p) - case _: Filter => empty(p) - case _: Sample => empty(p) - case _ => p - } + case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) && + canPropagate(p) => + empty(p) + } + + // extract the pattern avoid conflict with propagateEmptyRelationAdvanced + private def canPropagate(plan: LogicalPlan): Boolean = plan match { + case _: Project => true + case _: Filter => true + case _: Sample => true + case _ => false } override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { - // andThen instead of orElse, because there exists some same pattern, like UnaryNode. - propagateEmptyRelationBasic.andThen(propagateEmptyRelationAdvanced) + propagateEmptyRelationBasic.orElse(propagateEmptyRelationAdvanced) } } From 2c0dfb03cb1d95439c06fd316f7fb3a7aab55b6d Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 24 May 2021 16:53:03 +0800 Subject: [PATCH 13/23] empty --- .../optimizer/PropagateEmptyRelation.scala | 35 +++++++------------ .../adaptive/AQEPropagateEmptyRelation.scala | 21 ++++++----- 2 files changed, 22 insertions(+), 34 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 2a3f83c72580..8f0ae71ed8ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -44,28 +44,19 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_ * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. */ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport { - /** - * At AQE side, we use this function to check if a plan has output rows or not - */ - protected def checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = None - /** * At AQE side, we use the broadcast query stage to do the check */ protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None - protected def isEmptyLocalRelation(plan: LogicalPlan): Boolean = plan match { + protected def isEmpty(plan: LogicalPlan): Boolean = plan match { case p: LocalRelation => p.data.isEmpty case _ => false } - private def isEmptyLocalRelationWithRowCount(plan: LogicalPlan): Boolean = { - val defaultEmptyRelation: Boolean = isEmptyLocalRelation(plan) - if (checkRowCount.isDefined) { - checkRowCount.get.apply(plan, false) || defaultEmptyRelation - } else { - defaultEmptyRelation - } + protected def nonEmpty(plan: LogicalPlan): Boolean = plan match { + case p: LocalRelation => p.data.nonEmpty + case _ => false } protected def empty(plan: LogicalPlan): LocalRelation = @@ -85,8 +76,8 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup // just processing the input data. case p @ Join(_, _, joinType, conditionOpt, _) if !p.children.exists(_.isStreaming) => - val isLeftEmpty = isEmptyLocalRelationWithRowCount(p.left) - val isRightEmpty = isEmptyLocalRelationWithRowCount(p.right) + val isLeftEmpty = isEmpty(p.left) + val isRightEmpty = isEmpty(p.right) val isFalseCondition = conditionOpt match { case Some(FalseLiteral) => true case _ => false @@ -111,18 +102,16 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup Project(nullValueProjectList(p.left) ++ p.right.output, p.right) case _ => p } - } else if (joinType == LeftSemi && conditionOpt.isEmpty && - checkRowCount.isDefined && checkRowCount.get.apply(p.right, true)) { + } else if (joinType == LeftSemi && conditionOpt.isEmpty && nonEmpty(p.right)) { p.left - } else if (joinType == LeftAnti && conditionOpt.isEmpty && - checkRowCount.isDefined && checkRowCount.get.apply(p.right, true)) { + } else if (joinType == LeftAnti && conditionOpt.isEmpty && nonEmpty(p.right)) { empty(p) } else { p } case p: UnaryNode - if p.children.nonEmpty && p.children.forall(isEmptyLocalRelationWithRowCount) => p match { + if p.children.nonEmpty && p.children.forall(isEmpty) => p match { case _: Sort => empty(p) case _: GlobalLimit if !p.isStreaming => empty(p) case _: LocalLimit if !p.isStreaming => empty(p) @@ -160,8 +149,8 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup */ object PropagateEmptyRelation extends PropagateEmptyRelationBase { private def propagateEmptyRelationBasic: PartialFunction[LogicalPlan, LogicalPlan] = { - case p: Union if p.children.exists(isEmptyLocalRelation) => - val newChildren = p.children.filterNot(isEmptyLocalRelation) + case p: Union if p.children.exists(isEmpty) => + val newChildren = p.children.filterNot(isEmpty) if (newChildren.isEmpty) { empty(p) } else { @@ -181,7 +170,7 @@ object PropagateEmptyRelation extends PropagateEmptyRelationBase { } } - case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmptyLocalRelation) && + case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) && canPropagate(p) => empty(p) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 8aca5416cbf4..ccbc0f50b61d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -26,17 +26,16 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys */ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { - override protected def checkRowCount: Option[(LogicalPlan, Boolean) => Boolean] = { - Some((plan, hasRow) => { - plan match { - case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => - stage.getRuntimeStatistics.rowCount match { - case Some(count) => hasRow == (count > 0) - case _ => false - } - case _ => false - } - }) + override protected def isEmpty(plan: LogicalPlan): Boolean = + super.isEmpty(plan) || getRowCount(plan).contains(0) + + override protected def nonEmpty(plan: LogicalPlan): Boolean = + super.nonEmpty(plan) || getRowCount(plan).exists(_ > 0) + + private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match { + case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => + stage.getRuntimeStatistics.rowCount + case _ => None } override protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = { From 0e151d9f5ed361780869ecf456ee81eeb8a8cac7 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 24 May 2021 17:00:17 +0800 Subject: [PATCH 14/23] naaj --- .../optimizer/PropagateEmptyRelation.scala | 15 +++---------- .../adaptive/AQEPropagateEmptyRelation.scala | 21 +++++++++++-------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 8f0ae71ed8ed..02b3d1d7d262 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -44,11 +44,6 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_ * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. */ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSupport { - /** - * At AQE side, we use the broadcast query stage to do the check - */ - protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = None - protected def isEmpty(plan: LogicalPlan): Boolean = plan match { case p: LocalRelation => p.data.isEmpty case _ => false @@ -66,11 +61,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) } - protected def propagateEmptyRelationAdvanced: PartialFunction[LogicalPlan, LogicalPlan] = { - case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) - if isRelationWithAllNullKeys.isDefined && isRelationWithAllNullKeys.get.apply(j.right) => - empty(j) - + protected def commonApplyFunc: PartialFunction[LogicalPlan, LogicalPlan] = { // Joins on empty LocalRelations generated from streaming sources are not eliminated // as stateful streaming joins need to perform other state management operations other than // just processing the input data. @@ -148,7 +139,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup * - Project/Filter/Sample with all empty children. */ object PropagateEmptyRelation extends PropagateEmptyRelationBase { - private def propagateEmptyRelationBasic: PartialFunction[LogicalPlan, LogicalPlan] = { + private def applyFunc: PartialFunction[LogicalPlan, LogicalPlan] = { case p: Union if p.children.exists(isEmpty) => val newChildren = p.children.filterNot(isEmpty) if (newChildren.isEmpty) { @@ -185,6 +176,6 @@ object PropagateEmptyRelation extends PropagateEmptyRelationBase { override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithPruning( _.containsAnyPattern(LOCAL_RELATION, TRUE_OR_FALSE_LITERAL), ruleId) { - propagateEmptyRelationBasic.orElse(propagateEmptyRelationAdvanced) + applyFunc.orElse(commonApplyFunc) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index ccbc0f50b61d..106f95623f34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.optimizer.PropagateEmptyRelationBase +import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys @@ -25,7 +26,6 @@ import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys * Rule [[PropagateEmptyRelationBase]] at AQE optimizer side. */ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { - override protected def isEmpty(plan: LogicalPlan): Boolean = super.isEmpty(plan) || getRowCount(plan).contains(0) @@ -38,17 +38,20 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { case _ => None } - override protected def isRelationWithAllNullKeys: Option[LogicalPlan => Boolean] = { - Some({ - case LogicalQueryStage(_, stage: BroadcastQueryStageExec) - if stage.resultOption.get().isDefined => - stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys - case _ => false - }) + private def isRelationWithAllNullKeys(plan: LogicalPlan): Boolean = plan match { + case LogicalQueryStage(_, stage: BroadcastQueryStageExec) + if stage.resultOption.get().isDefined => + stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys + case _ => false + } + + private def eliminateSingleColumnNullAwareAntiJoin: PartialFunction[LogicalPlan, LogicalPlan] = { + case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if isRelationWithAllNullKeys(j.right) => + empty(j) } // TODO we need use transformUpWithPruning instead of transformUp def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp { - propagateEmptyRelationAdvanced + eliminateSingleColumnNullAwareAntiJoin.orElse(commonApplyFunc) } } From c086f72d967230cc2dbd3d4f63ec900ca495a86d Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 24 May 2021 17:04:38 +0800 Subject: [PATCH 15/23] comment --- .../sql/catalyst/optimizer/PropagateEmptyRelation.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 02b3d1d7d262..30c8467ea1d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -131,12 +131,14 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup } /** - * Rule [[PropagateEmptyRelationBase]] at normal optimizer side. - * With the extra optimal pattern: + * Rule [[PropagateEmptyRelationBase]] at normal optimizer side with the extra pattern: * 1. Higher-node Logical Plans * - Union with all empty children. * 2. Unary-node Logical Plans * - Project/Filter/Sample with all empty children. + * + * The reason why we don't apply this rule at AQE optimizer side is: the benefit is not big enough + * and it may introduce extra exchanges. */ object PropagateEmptyRelation extends PropagateEmptyRelationBase { private def applyFunc: PartialFunction[LogicalPlan, LogicalPlan] = { From 9b78ac05c434570ab80ad9fb421a55f7f557124d Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 24 May 2021 17:05:48 +0800 Subject: [PATCH 16/23] nit --- .../spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 30c8467ea1d8..a24f7f2d23e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral -import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ From 767dd92618461c2891e6aeb73d99f320f1601f17 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 24 May 2021 17:13:42 +0800 Subject: [PATCH 17/23] inline --- .../sql/catalyst/optimizer/PropagateEmptyRelation.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index a24f7f2d23e6..c156b7731526 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -100,8 +100,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup p } - case p: UnaryNode - if p.children.nonEmpty && p.children.forall(isEmpty) => p match { + case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) => p match { case _: Sort => empty(p) case _: GlobalLimit if !p.isStreaming => empty(p) case _: LocalLimit if !p.isStreaming => empty(p) @@ -162,8 +161,7 @@ object PropagateEmptyRelation extends PropagateEmptyRelationBase { } } - case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) && - canPropagate(p) => + case p: UnaryNode if p.children.nonEmpty && p.children.forall(isEmpty) && canPropagate(p) => empty(p) } From 47e0c3ade95beaf5801a3b6833cc8344f5f713ae Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 24 May 2021 19:23:49 +0800 Subject: [PATCH 18/23] test --- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 549ec352027f..ea3918ab1db9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1313,11 +1313,11 @@ class AdaptiveQueryExecSuite SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { Seq( // left semi join and empty left side - ("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 LEFT SEMI JOIN testData2 t2 ON " + - "t1.key = t2.a", true), + ("SELECT * FROM (SELECT * FROM testData WHERE value = '0')t1 LEFT SEMI JOIN " + + "testData2 t2 ON t1.key = t2.a", true), // left anti join and empty left side - ("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 LEFT ANTI JOIN testData2 t2 ON " + - "t1.key = t2.a", true), + ("SELECT * FROM (SELECT * FROM testData WHERE value = '0')t1 LEFT ANTI JOIN " + + "testData2 t2 ON t1.key = t2.a", true), // left outer join and empty left side ("SELECT * FROM (SELECT * FROM testData WHERE key = 0)t1 LEFT JOIN testData2 t2 ON " + "t1.key = t2.a", true), From d9ca6dac29c8f69838faa9748b55f05d6203c012 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 24 May 2021 19:46:11 +0800 Subject: [PATCH 19/23] comment --- .../sql/catalyst/optimizer/PropagateEmptyRelation.scala | 9 ++++----- .../execution/adaptive/AQEPropagateEmptyRelation.scala | 6 +++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index c156b7731526..867dd0fe3361 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -26,12 +26,10 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TRUE_OR_FALSE_LITERAL} /** - * The rule used by both normal Optimizer and AQE Optimizer for: + * The base class of two rules in the normal and AQE Optimizer. It simplifies query plans with + * empty or non-empty relations: * 1. Binary-node Logical Plans * - Join with one or two empty children (including Intersect/Except). - * - Join is single column NULL-aware anti join (NAAJ) - * Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. Eliminate join to an - * empty [[LocalRelation]]. * - Left semi Join * Right side is non-empty and condition is empty. Eliminate join to its left side. * - Left anti join @@ -129,7 +127,8 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup } /** - * Rule [[PropagateEmptyRelationBase]] at normal optimizer side with the extra pattern: + * This rule runs in the normal optimizer and optimizes more cases + * compared to [[PropagateEmptyRelationBase]]: * 1. Higher-node Logical Plans * - Union with all empty children. * 2. Unary-node Logical Plans diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala index 106f95623f34..614fc78477c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala @@ -23,7 +23,11 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys /** - * Rule [[PropagateEmptyRelationBase]] at AQE optimizer side. + * This rule runs in the AQE optimizer and optimizes more cases + * compared to [[PropagateEmptyRelationBase]]: + * 1. Join is single column NULL-aware anti join (NAAJ) + * Broadcasted [[HashedRelation]] is [[HashedRelationWithAllNullKeys]]. Eliminate join to an + * empty [[LocalRelation]]. */ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase { override protected def isEmpty(plan: LogicalPlan): Boolean = From 2fead86608f0e518092445b973e607ae54373cd3 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 24 May 2021 19:46:47 +0800 Subject: [PATCH 20/23] Relations --- .../org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala index 7ea543089808..b28626b28f07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala @@ -28,7 +28,7 @@ import org.apache.spark.util.Utils */ class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] { private val defaultBatches = Seq( - Batch("Propagate Empty LocalRelation", Once, + Batch("Propagate Empty Relations", Once, AQEPropagateEmptyRelation, UpdateAttributeNullability), Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin) From 0754936c6a053f3f8b6d7a440fca398621c6eb21 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Mon, 24 May 2021 19:47:30 +0800 Subject: [PATCH 21/23] indentation --- .../spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 867dd0fe3361..f39e1adfe5f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -63,7 +63,7 @@ abstract class PropagateEmptyRelationBase extends Rule[LogicalPlan] with CastSup // as stateful streaming joins need to perform other state management operations other than // just processing the input data. case p @ Join(_, _, joinType, conditionOpt, _) - if !p.children.exists(_.isStreaming) => + if !p.children.exists(_.isStreaming) => val isLeftEmpty = isEmpty(p.left) val isRightEmpty = isEmpty(p.right) val isFalseCondition = conditionOpt match { From a6213ea8c920174f744521713929a36d029dc4d6 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 25 May 2021 09:05:58 +0800 Subject: [PATCH 22/23] test name --- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index ea3918ab1db9..d67adee63b7f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1308,7 +1308,8 @@ class AdaptiveQueryExecSuite } } - test("SPARK-35455: Enhance EliminateUnnecessaryJoin - single join") { + test("SPARK-35455: Unify empty relation optimization between normal and AQE optimizer " + + "- single join") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { Seq( @@ -1344,7 +1345,8 @@ class AdaptiveQueryExecSuite } } - test("SPARK-35455: Enhance EliminateUnnecessaryJoin - multi join") { + test("SPARK-35455: Unify empty relation optimization between normal and AQE optimizer " + + "- multi join") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { Seq( From 624e45e27513933e9fe5615601d0dddac341a258 Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Tue, 25 May 2021 09:07:19 +0800 Subject: [PATCH 23/23] nit --- .../spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index f39e1adfe5f2..04c23ce29a8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -164,7 +164,7 @@ object PropagateEmptyRelation extends PropagateEmptyRelationBase { empty(p) } - // extract the pattern avoid conflict with propagateEmptyRelationAdvanced + // extract the pattern avoid conflict with commonApplyFunc private def canPropagate(plan: LogicalPlan): Boolean = plan match { case _: Project => true case _: Filter => true