From 117dc6dbd44deac1cc19663f5433386efcc30b4e Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Fri, 13 Nov 2020 12:11:57 -0800 Subject: [PATCH 1/3] update --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/execution/QueryExecution.scala | 2 +- .../spark/sql/execution/SparkPlan.scala | 7 ++++++- .../adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../execution/RemoveRedundantSortsSuite.scala | 20 +++++++++++++++++++ 5 files changed, 29 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5c17f0434bc7..43014feecfd8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1277,7 +1277,7 @@ object SQLConf { val REMOVE_REDUNDANT_SORTS_ENABLED = buildConf("spark.sql.execution.removeRedundantSorts") .internal() .doc("Whether to remove redundant physical sort node") - .version("3.1.0") + .version("2.4.8") .booleanConf .createWithDefault(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 77f7a4e553f0..49dca7e063c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -343,8 +343,8 @@ object QueryExecution { PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), RemoveRedundantProjects, - RemoveRedundantSorts, EnsureRequirements, + RemoveRedundantSorts, DisableUnnecessaryBucketedScan, ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules), CollapseCodegenStages(), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ead8c0003111..062aa69b3adb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -135,7 +135,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ def longMetric(name: String): SQLMetric = metrics(name) // TODO: Move to `DistributedPlan` - /** Specifies how data is partitioned across different nodes in the cluster. */ + /** + * Specifies how data is partitioned across different nodes in the cluster. + * Note this method may fail if it is invoked before `EnsureRequirements` is applied + * since `PartitioningCollection` requires all its partitionings to have + * the same number of partitions. + */ def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH! /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 0865e42b440d..570edbf5f78a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -88,8 +88,8 @@ case class AdaptiveSparkPlanExec( // Exchange nodes) after running these rules. private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq( RemoveRedundantProjects, - RemoveRedundantSorts, EnsureRequirements, + RemoveRedundantSorts, DisableUnnecessaryBucketedScan ) ++ context.session.sessionState.queryStagePrepRules diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala index 54c5a3344190..936b0274f441 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala @@ -135,6 +135,26 @@ abstract class RemoveRedundantSortsSuiteBase } } } + + test("shuffled join with different left and right side partition numbers") { + withTempView("t1", "t2") { + spark.range(0, 100, 1, 2).select('id as "key").createOrReplaceTempView("t1") + (0 to 100).toDF("key").createOrReplaceTempView("t2") + + // left side partitioning: RangePartitioning(key ASC, 2) + // right side partitioning: UnknownPartitioning(0) + val queryTemplate = """ + |SELECT /*+ %s(t1) */ t1.key + |FROM t1 JOIN t2 ON t1.key = t2.key + |WHERE t1.key > 10 AND t2.key < 50 + |ORDER BY t1.key ASC + """.stripMargin + + Seq(("MERGE", 3), ("SHUFFLE_HASH", 1)).foreach { case (hint, count) => + checkSorts(queryTemplate.format(hint), count, count) + } + } + } } class RemoveRedundantSortsSuite extends RemoveRedundantSortsSuiteBase From 9d28f4049e4f58919431307d372b8cb96d3a2b8e Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Tue, 17 Nov 2020 17:46:28 -0800 Subject: [PATCH 2/3] address comments --- .../org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../spark/sql/execution/QueryExecution.scala | 2 ++ .../sql/execution/RemoveRedundantSortsSuite.scala | 15 +++++++++++---- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 43014feecfd8..5c17f0434bc7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1277,7 +1277,7 @@ object SQLConf { val REMOVE_REDUNDANT_SORTS_ENABLED = buildConf("spark.sql.execution.removeRedundantSorts") .internal() .doc("Whether to remove redundant physical sort node") - .version("2.4.8") + .version("3.1.0") .booleanConf .createWithDefault(true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 49dca7e063c0..040d1f36ed8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -344,6 +344,8 @@ object QueryExecution { PlanSubqueries(sparkSession), RemoveRedundantProjects, EnsureRequirements, + // `RemoveRedundantSorts` needs to be added before `EnsureRequirements` to guarantee the same + // number of partitions when instantiating PartitioningCollection. RemoveRedundantSorts, DisableUnnecessaryBucketedScan, ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala index 936b0274f441..fba5cb6790ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{DataFrame, QueryTest} +import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} +import org.apache.spark.sql.execution.joins.ShuffledJoin import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -136,13 +138,11 @@ abstract class RemoveRedundantSortsSuiteBase } } - test("shuffled join with different left and right side partition numbers") { + test("SPARK-33472: shuffled join with different left and right side partition numbers") { withTempView("t1", "t2") { spark.range(0, 100, 1, 2).select('id as "key").createOrReplaceTempView("t1") (0 to 100).toDF("key").createOrReplaceTempView("t2") - // left side partitioning: RangePartitioning(key ASC, 2) - // right side partitioning: UnknownPartitioning(0) val queryTemplate = """ |SELECT /*+ %s(t1) */ t1.key |FROM t1 JOIN t2 ON t1.key = t2.key @@ -151,7 +151,14 @@ abstract class RemoveRedundantSortsSuiteBase """.stripMargin Seq(("MERGE", 3), ("SHUFFLE_HASH", 1)).foreach { case (hint, count) => - checkSorts(queryTemplate.format(hint), count, count) + val query = queryTemplate.format(hint) + val df = sql(query) + val sparkPlan = df.queryExecution.sparkPlan + val join = sparkPlan.collect { case j: ShuffledJoin => j }.head + val range = sparkPlan.collect { case r: RangeExec => r }.head + assert(join.left.outputPartitioning == range.outputPartitioning) + assert(join.right.outputPartitioning == UnknownPartitioning(0)) + checkSorts(query, count, count) } } } From 4e684df701c9421eba105d469b0338a5320ce42c Mon Sep 17 00:00:00 2001 From: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com> Date: Wed, 18 Nov 2020 20:37:35 -0800 Subject: [PATCH 3/3] fix test --- .../spark/sql/execution/RemoveRedundantSortsSuite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala index fba5cb6790ba..751078d08fda 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.{DataFrame, QueryTest} -import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning +import org.apache.spark.sql.catalyst.plans.physical.{RangePartitioning, UnknownPartitioning} import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.joins.ShuffledJoin import org.apache.spark.sql.internal.SQLConf @@ -155,8 +155,9 @@ abstract class RemoveRedundantSortsSuiteBase val df = sql(query) val sparkPlan = df.queryExecution.sparkPlan val join = sparkPlan.collect { case j: ShuffledJoin => j }.head - val range = sparkPlan.collect { case r: RangeExec => r }.head - assert(join.left.outputPartitioning == range.outputPartitioning) + val leftPartitioning = join.left.outputPartitioning + assert(leftPartitioning.isInstanceOf[RangePartitioning]) + assert(leftPartitioning.numPartitions == 2) assert(join.right.outputPartitioning == UnknownPartitioning(0)) checkSorts(query, count, count) }