From 8b3be59b066512644bfba7d11556c08e64f98b22 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 29 Jun 2020 11:33:40 +0000 Subject: [PATCH 1/3] [SPARK-32056][SQL] Coalesce partitions for repartition by expressions when AQE is enabled This patch proposes to coalesce partitions for repartition by expressions without specifying number of partitions, when AQE is enabled. When repartition by some partition expressions, users can specify number of partitions or not. If the number of partitions is specified, we should not coalesce partitions because it breaks user expectation. But if without specifying number of partitions, AQE should be able to coalesce partitions as other shuffling. Yes. After this change, if users don't specify the number of partitions when repartitioning data by expressions, AQE will coalesce partitions. Added unit test. Closes #28900 from viirya/SPARK-32056. Authored-by: Liang-Chi Hsieh Signed-off-by: Wenchen Fan --- .../plans/logical/basicLogicalOperators.scala | 18 ++++- .../scala/org/apache/spark/sql/Dataset.scala | 54 +++++++------ .../spark/sql/execution/SparkStrategies.scala | 3 +- .../adaptive/AdaptiveQueryExecSuite.scala | 75 +++++++++++++++++-- 4 files changed, 119 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 54e5ff7aeb75..61ba6da3862a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.random.RandomSampler @@ -948,16 +949,18 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) } /** - * This method repartitions data using [[Expression]]s into `numPartitions`, and receives + * This method repartitions data using [[Expression]]s into `optNumPartitions`, and receives * information about the number of partitions during execution. Used when a specific ordering or * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like - * `coalesce` and `repartition`. + * `coalesce` and `repartition`. If no `optNumPartitions` is given, by default it partitions data + * into `numShufflePartitions` defined in `SQLConf`, and could be coalesced by AQE. */ case class RepartitionByExpression( partitionExpressions: Seq[Expression], child: LogicalPlan, - numPartitions: Int) extends RepartitionOperation { + optNumPartitions: Option[Int]) extends RepartitionOperation { + val numPartitions = optNumPartitions.getOrElse(SQLConf.get.numShufflePartitions) require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") val partitioning: Partitioning = { @@ -985,6 +988,15 @@ case class RepartitionByExpression( override def shuffle: Boolean = true } +object RepartitionByExpression { + def apply( + partitionExpressions: Seq[Expression], + child: LogicalPlan, + numPartitions: Int): RepartitionByExpression = { + RepartitionByExpression(partitionExpressions, child, Some(numPartitions)) + } +} + /** * A relation with one row. This is used in "SELECT ..." without a from clause. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3b6fd2fcc177..0dcdf0303869 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2991,17 +2991,9 @@ class Dataset[T] private[sql]( Repartition(numPartitions, shuffle = true, logicalPlan) } - /** - * Returns a new Dataset partitioned by the given partitioning expressions into - * `numPartitions`. The resulting Dataset is hash partitioned. - * - * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). - * - * @group typedrel - * @since 2.0.0 - */ - @scala.annotation.varargs - def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { + private def repartitionByExpression( + numPartitions: Option[Int], + partitionExprs: Seq[Column]): Dataset[T] = { // The underlying `LogicalPlan` operator special-cases all-`SortOrder` arguments. // However, we don't want to complicate the semantics of this API method. // Instead, let's give users a friendly error message, pointing them to the new method. @@ -3015,6 +3007,20 @@ class Dataset[T] private[sql]( } } + /** + * Returns a new Dataset partitioned by the given partitioning expressions into + * `numPartitions`. The resulting Dataset is hash partitioned. + * + * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). + * + * @group typedrel + * @since 2.0.0 + */ + @scala.annotation.varargs + def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { + repartitionByExpression(Some(numPartitions), partitionExprs) + } + /** * Returns a new Dataset partitioned by the given partitioning expressions, using * `spark.sql.shuffle.partitions` as number of partitions. @@ -3027,7 +3033,20 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartition(partitionExprs: Column*): Dataset[T] = { - repartition(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + repartitionByExpression(None, partitionExprs) + } + + private def repartitionByRange( + numPartitions: Option[Int], + partitionExprs: Seq[Column]): Dataset[T] = { + require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") + val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { + case expr: SortOrder => expr + case expr: Expression => SortOrder(expr, Ascending) + }) + withTypedPlan { + RepartitionByExpression(sortOrder, logicalPlan, numPartitions) + } } /** @@ -3049,14 +3068,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartitionByRange(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { - require(partitionExprs.nonEmpty, "At least one partition-by expression must be specified.") - val sortOrder: Seq[SortOrder] = partitionExprs.map(_.expr match { - case expr: SortOrder => expr - case expr: Expression => SortOrder(expr, Ascending) - }) - withTypedPlan { - RepartitionByExpression(sortOrder, logicalPlan, numPartitions) - } + repartitionByRange(Some(numPartitions), partitionExprs) } /** @@ -3078,7 +3090,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartitionByRange(partitionExprs: Column*): Dataset[T] = { - repartitionByRange(sparkSession.sessionState.conf.numShufflePartitions, partitionExprs: _*) + repartitionByRange(None, partitionExprs) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 689d1eb62ffa..dbfd4bf7de44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -787,8 +787,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: logical.Range => execution.RangeExec(r) :: Nil case r: logical.RepartitionByExpression => + val canChangeNumParts = r.optNumPartitions.isEmpty exchange.ShuffleExchangeExec( - r.partitioning, planLater(r.child), noUserSpecifiedNumPartition = false) :: Nil + r.partitioning, planLater(r.child), canChangeNumParts) :: Nil case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil case r: LogicalRDD => RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 6d97a6bb47d0..9cfff10d59c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -874,18 +874,81 @@ class AdaptiveQueryExecSuite } } - test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") { + test("SPARK-31220, SPARK-32056: repartition by expression with AQE") { Seq(true, false).foreach { enableAQE => withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, - SQLConf.SHUFFLE_PARTITIONS.key -> "6", - SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") { - val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10", + SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + + val df1 = spark.range(10).repartition($"id") + val df2 = spark.range(10).repartition($"id" + 1) + + val partitionsNum1 = df1.rdd.collectPartitions().length + val partitionsNum2 = df2.rdd.collectPartitions().length + if (enableAQE) { - assert(partitionsNum === 7) + assert(partitionsNum1 < 10) + assert(partitionsNum2 < 10) + + // repartition obeys initialPartitionNum when adaptiveExecutionEnabled + val plan = df1.queryExecution.executedPlan + assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) + val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffle.size == 1) + assert(shuffle(0).outputPartitioning.numPartitions == 10) } else { - assert(partitionsNum === 6) + assert(partitionsNum1 === 10) + assert(partitionsNum2 === 10) } + + + // Don't coalesce partitions if the number of partitions is specified. + val df3 = spark.range(10).repartition(10, $"id") + val df4 = spark.range(10).repartition(10) + assert(df3.rdd.collectPartitions().length == 10) + assert(df4.rdd.collectPartitions().length == 10) + } + } + } + + test("SPARK-31220, SPARK-32056: repartition by range with AQE") { + Seq(true, false).foreach { enableAQE => + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10", + SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + + val df1 = spark.range(10).toDF.repartitionByRange($"id".asc) + val df2 = spark.range(10).toDF.repartitionByRange(($"id" + 1).asc) + + val partitionsNum1 = df1.rdd.collectPartitions().length + val partitionsNum2 = df2.rdd.collectPartitions().length + + if (enableAQE) { + assert(partitionsNum1 < 10) + assert(partitionsNum2 < 10) + + // repartition obeys initialPartitionNum when adaptiveExecutionEnabled + val plan = df1.queryExecution.executedPlan + assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) + val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffle.size == 1) + assert(shuffle(0).outputPartitioning.numPartitions == 10) + } else { + assert(partitionsNum1 === 10) + assert(partitionsNum2 === 10) + } + + // Don't coalesce partitions if the number of partitions is specified. + val df3 = spark.range(10).repartitionByRange(10, $"id".asc) + assert(df3.rdd.collectPartitions().length == 10) } } } From 7b1c0064d0bb7db313265efa1a6b28c5c2577ee2 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 1 Jul 2020 16:14:51 -0700 Subject: [PATCH 2/3] [SPARK-32056][SQL][FOLLOW-UP] Coalesce partitions for repartiotion hint and sql when AQE is enabled As the followup of #28900, this patch extends coalescing partitions to repartitioning using hints and SQL syntax without specifying number of partitions, when AQE is enabled. When repartitionning using hints and SQL syntax, we should follow the shuffling behavior of repartition by expression/range to coalesce partitions when AQE is enabled. Yes. After this change, if users don't specify the number of partitions when repartitioning using `REPARTITION`/`REPARTITION_BY_RANGE` hint or `DISTRIBUTE BY`/`CLUSTER BY`, AQE will coalesce partitions. Unit tests. Closes #28952 from viirya/SPARK-32056-sql. Authored-by: Liang-Chi Hsieh Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/analysis/ResolveHints.scala | 16 ++-- .../catalyst/analysis/ResolveHintsSuite.scala | 4 +- .../spark/sql/execution/SparkSqlParser.scala | 2 +- .../sql/execution/SparkSqlParserSuite.scala | 6 +- .../adaptive/AdaptiveQueryExecSuite.scala | 81 +++++++++++++++---- 5 files changed, 78 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala index 81de086e78f9..4cbff62e16cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveHints.scala @@ -183,7 +183,7 @@ object ResolveHints { val hintName = hint.name.toUpperCase(Locale.ROOT) def createRepartitionByExpression( - numPartitions: Int, partitionExprs: Seq[Any]): RepartitionByExpression = { + numPartitions: Option[Int], partitionExprs: Seq[Any]): RepartitionByExpression = { val sortOrders = partitionExprs.filter(_.isInstanceOf[SortOrder]) if (sortOrders.nonEmpty) throw new IllegalArgumentException( s"""Invalid partitionExprs specified: $sortOrders @@ -208,11 +208,11 @@ object ResolveHints { throw new AnalysisException(s"$hintName Hint expects a partition number as a parameter") case param @ Seq(IntegerLiteral(numPartitions), _*) if shuffle => - createRepartitionByExpression(numPartitions, param.tail) + createRepartitionByExpression(Some(numPartitions), param.tail) case param @ Seq(numPartitions: Int, _*) if shuffle => - createRepartitionByExpression(numPartitions, param.tail) + createRepartitionByExpression(Some(numPartitions), param.tail) case param @ Seq(_*) if shuffle => - createRepartitionByExpression(conf.numShufflePartitions, param) + createRepartitionByExpression(None, param) } } @@ -224,7 +224,7 @@ object ResolveHints { val hintName = hint.name.toUpperCase(Locale.ROOT) def createRepartitionByExpression( - numPartitions: Int, partitionExprs: Seq[Any]): RepartitionByExpression = { + numPartitions: Option[Int], partitionExprs: Seq[Any]): RepartitionByExpression = { val invalidParams = partitionExprs.filter(!_.isInstanceOf[UnresolvedAttribute]) if (invalidParams.nonEmpty) { throw new AnalysisException(s"$hintName Hint parameter should include columns, but " + @@ -239,11 +239,11 @@ object ResolveHints { hint.parameters match { case param @ Seq(IntegerLiteral(numPartitions), _*) => - createRepartitionByExpression(numPartitions, param.tail) + createRepartitionByExpression(Some(numPartitions), param.tail) case param @ Seq(numPartitions: Int, _*) => - createRepartitionByExpression(numPartitions, param.tail) + createRepartitionByExpression(Some(numPartitions), param.tail) case param @ Seq(_*) => - createRepartitionByExpression(conf.numShufflePartitions, param) + createRepartitionByExpression(None, param) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala index d3bd5d07a093..513f1d001f75 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveHintsSuite.scala @@ -163,7 +163,7 @@ class ResolveHintsSuite extends AnalysisTest { checkAnalysis( UnresolvedHint("REPARTITION", Seq(UnresolvedAttribute("a")), table("TaBlE")), RepartitionByExpression( - Seq(AttributeReference("a", IntegerType)()), testRelation, conf.numShufflePartitions)) + Seq(AttributeReference("a", IntegerType)()), testRelation, None)) val e = intercept[IllegalArgumentException] { checkAnalysis( @@ -187,7 +187,7 @@ class ResolveHintsSuite extends AnalysisTest { "REPARTITION_BY_RANGE", Seq(UnresolvedAttribute("a")), table("TaBlE")), RepartitionByExpression( Seq(SortOrder(AttributeReference("a", IntegerType)(), Ascending)), - testRelation, conf.numShufflePartitions)) + testRelation, None)) val errMsg2 = "REPARTITION Hint parameter should include columns, but" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 4814f627e374..f4567b8c39b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -740,7 +740,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { ctx: QueryOrganizationContext, expressions: Seq[Expression], query: LogicalPlan): LogicalPlan = { - RepartitionByExpression(expressions, query, conf.numShufflePartitions) + RepartitionByExpression(expressions, query, None) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index 343d3c1c1346..79dc516b226b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -209,20 +209,20 @@ class SparkSqlParserSuite extends AnalysisTest { assertEqual(s"$baseSql distribute by a, b", RepartitionByExpression(UnresolvedAttribute("a") :: UnresolvedAttribute("b") :: Nil, basePlan, - numPartitions = newConf.numShufflePartitions)) + None)) assertEqual(s"$baseSql distribute by a sort by b", Sort(SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil, global = false, RepartitionByExpression(UnresolvedAttribute("a") :: Nil, basePlan, - numPartitions = newConf.numShufflePartitions))) + None))) assertEqual(s"$baseSql cluster by a, b", Sort(SortOrder(UnresolvedAttribute("a"), Ascending) :: SortOrder(UnresolvedAttribute("b"), Ascending) :: Nil, global = false, RepartitionByExpression(UnresolvedAttribute("a") :: UnresolvedAttribute("b") :: Nil, basePlan, - numPartitions = newConf.numShufflePartitions))) + None))) } test("pipeline concatenation") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 9cfff10d59c3..c9992e484e67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -23,7 +23,7 @@ import java.net.URI import org.apache.log4j.Level import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} -import org.apache.spark.sql.{QueryTest, Row, SparkSession, Strategy} +import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan} import org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION @@ -130,6 +130,17 @@ class AdaptiveQueryExecSuite assert(numShuffles === (numLocalReaders.length + numShufflesWithoutLocalReader)) } + private def checkInitialPartitionNum(df: Dataset[_], numPartition: Int): Unit = { + // repartition obeys initialPartitionNum when adaptiveExecutionEnabled + val plan = df.queryExecution.executedPlan + assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) + val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { + case s: ShuffleExchangeExec => s + } + assert(shuffle.size == 1) + assert(shuffle(0).outputPartitioning.numPartitions == numPartition) + } + test("Change merge join to broadcast join") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", @@ -892,14 +903,8 @@ class AdaptiveQueryExecSuite assert(partitionsNum1 < 10) assert(partitionsNum2 < 10) - // repartition obeys initialPartitionNum when adaptiveExecutionEnabled - val plan = df1.queryExecution.executedPlan - assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) - val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { - case s: ShuffleExchangeExec => s - } - assert(shuffle.size == 1) - assert(shuffle(0).outputPartitioning.numPartitions == 10) + checkInitialPartitionNum(df1, 10) + checkInitialPartitionNum(df2, 10) } else { assert(partitionsNum1 === 10) assert(partitionsNum2 === 10) @@ -933,14 +938,8 @@ class AdaptiveQueryExecSuite assert(partitionsNum1 < 10) assert(partitionsNum2 < 10) - // repartition obeys initialPartitionNum when adaptiveExecutionEnabled - val plan = df1.queryExecution.executedPlan - assert(plan.isInstanceOf[AdaptiveSparkPlanExec]) - val shuffle = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan.collect { - case s: ShuffleExchangeExec => s - } - assert(shuffle.size == 1) - assert(shuffle(0).outputPartitioning.numPartitions == 10) + checkInitialPartitionNum(df1, 10) + checkInitialPartitionNum(df2, 10) } else { assert(partitionsNum1 === 10) assert(partitionsNum2 === 10) @@ -966,4 +965,52 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-31220, SPARK-32056: repartition using sql and hint with AQE") { + Seq(true, false).foreach { enableAQE => + withTempView("test") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString, + SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", + SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "10", + SQLConf.SHUFFLE_PARTITIONS.key -> "10") { + + spark.range(10).toDF.createTempView("test") + + val df1 = spark.sql("SELECT /*+ REPARTITION(id) */ * from test") + val df2 = spark.sql("SELECT /*+ REPARTITION_BY_RANGE(id) */ * from test") + val df3 = spark.sql("SELECT * from test DISTRIBUTE BY id") + val df4 = spark.sql("SELECT * from test CLUSTER BY id") + + val partitionsNum1 = df1.rdd.collectPartitions().length + val partitionsNum2 = df2.rdd.collectPartitions().length + val partitionsNum3 = df3.rdd.collectPartitions().length + val partitionsNum4 = df4.rdd.collectPartitions().length + + if (enableAQE) { + assert(partitionsNum1 < 10) + assert(partitionsNum2 < 10) + assert(partitionsNum3 < 10) + assert(partitionsNum4 < 10) + + checkInitialPartitionNum(df1, 10) + checkInitialPartitionNum(df2, 10) + checkInitialPartitionNum(df3, 10) + checkInitialPartitionNum(df4, 10) + } else { + assert(partitionsNum1 === 10) + assert(partitionsNum2 === 10) + assert(partitionsNum3 === 10) + assert(partitionsNum4 === 10) + } + + // Don't coalesce partitions if the number of partitions is specified. + val df5 = spark.sql("SELECT /*+ REPARTITION(10, id) */ * from test") + val df6 = spark.sql("SELECT /*+ REPARTITION_BY_RANGE(10, id) */ * from test") + assert(df5.rdd.collectPartitions().length == 10) + assert(df6.rdd.collectPartitions().length == 10) + } + } + } + } } From 62f8cc6b227b1f541704d4b26e0ac6141778308b Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 25 Nov 2020 02:02:32 +0000 Subject: [PATCH 3/3] [SPARK-33494][SQL][AQE] Do not use local shuffle reader for repartition This PR updates `ShuffleExchangeExec` to carry more information about how much we can change the partitioning. For `repartition(col)`, we should preserve the user-specified partitioning and don't apply the AQE local shuffle reader. Similar to `repartition(number, col)`, we should respect the user-specified partitioning. No a new test Closes #30432 from cloud-fan/aqe. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../spark/sql/execution/SparkStrategies.scala | 13 +++++---- .../adaptive/CoalesceShufflePartitions.scala | 9 +++++- .../adaptive/OptimizeLocalShuffleReader.scala | 13 ++++++--- .../exchange/ShuffleExchangeExec.scala | 28 +++++++++++++------ .../sql-tests/results/explain-aqe.sql.out | 20 ++++++------- .../sql-tests/results/explain.sql.out | 28 +++++++++---------- .../sql/SparkSessionExtensionSuite.scala | 6 ++-- .../adaptive/AdaptiveQueryExecSuite.scala | 10 +++++++ 8 files changed, 83 insertions(+), 44 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index dbfd4bf7de44..fe1f45e34ce5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.streaming.InternalOutputModes import org.apache.spark.sql.execution.aggregate.AggUtils import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.exchange.{REPARTITION, REPARTITION_WITH_NUM, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.execution.python._ import org.apache.spark.sql.execution.streaming._ @@ -754,7 +754,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Repartition(numPartitions, shuffle, child) => if (shuffle) { ShuffleExchangeExec(RoundRobinPartitioning(numPartitions), - planLater(child), noUserSpecifiedNumPartition = false) :: Nil + planLater(child), REPARTITION_WITH_NUM) :: Nil } else { execution.CoalesceExec(numPartitions, planLater(child)) :: Nil } @@ -787,9 +787,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case r: logical.Range => execution.RangeExec(r) :: Nil case r: logical.RepartitionByExpression => - val canChangeNumParts = r.optNumPartitions.isEmpty - exchange.ShuffleExchangeExec( - r.partitioning, planLater(r.child), canChangeNumParts) :: Nil + val shuffleOrigin = if (r.optNumPartitions.isEmpty) { + REPARTITION + } else { + REPARTITION_WITH_NUM + } + exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil case r: LogicalRDD => RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala index 096d65f16e42..11c8fee2ed29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala @@ -18,8 +18,10 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, REPARTITION, ShuffleExchangeLike} import org.apache.spark.sql.internal.SQLConf /** @@ -50,7 +52,7 @@ case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPl val shuffleStages = collectShuffleStages(plan) // ShuffleExchanges introduced by repartition do not support changing the number of partitions. // We change the number of partitions in the stage only if all the ShuffleExchanges support it. - if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) { + if (!shuffleStages.forall(s => supportCoalesce(s.shuffle))) { plan } else { // `ShuffleQueryStageExec#mapStats` returns None when the input RDD has 0 partitions, @@ -85,6 +87,11 @@ case class CoalesceShufflePartitions(session: SparkSession) extends Rule[SparkPl } } } + + private def supportCoalesce(s: ShuffleExchangeLike): Boolean = { + s.outputPartitioning != SinglePartition && + (s.shuffleOrigin == ENSURE_REQUIREMENTS || s.shuffleOrigin == REPARTITION) + } } object CoalesceShufflePartitions { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index 31d1f34b64a6..f4891f8b7656 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -17,9 +17,10 @@ package org.apache.spark.sql.execution.adaptive +import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, EnsureRequirements, ShuffleExchangeExec, ShuffleExchangeLike} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.internal.SQLConf @@ -142,9 +143,13 @@ object OptimizeLocalShuffleReader { def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match { case s: ShuffleQueryStageExec => - s.shuffle.canChangeNumPartitions && s.mapStats.isDefined - case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _, _) => - s.shuffle.canChangeNumPartitions && s.mapStats.isDefined + s.mapStats.isDefined && supportLocalReader(s.shuffle) + case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs, _) => + s.mapStats.isDefined && partitionSpecs.nonEmpty && supportLocalReader(s.shuffle) case _ => false } + + private def supportLocalReader(s: ShuffleExchangeLike): Boolean = { + s.outputPartitioning != SinglePartition && s.shuffleOrigin == ENSURE_REQUIREMENTS + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 24c736951fdc..25462c28a427 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -57,9 +57,9 @@ trait ShuffleExchangeLike extends Exchange { def numPartitions: Int /** - * Returns whether the shuffle partition number can be changed. + * The origin of this shuffle operator. */ - def canChangeNumPartitions: Boolean + def shuffleOrigin: ShuffleOrigin /** * The asynchronous job that materializes the shuffle. @@ -77,18 +77,30 @@ trait ShuffleExchangeLike extends Exchange { def runtimeStatistics: Statistics } +// Describes where the shuffle operator comes from. +sealed trait ShuffleOrigin + +// Indicates that the shuffle operator was added by the internal `EnsureRequirements` rule. It +// means that the shuffle operator is used to ensure internal data partitioning requirements and +// Spark is free to optimize it as long as the requirements are still ensured. +case object ENSURE_REQUIREMENTS extends ShuffleOrigin + +// Indicates that the shuffle operator was added by the user-specified repartition operator. Spark +// can still optimize it via changing shuffle partition number, as data partitioning won't change. +case object REPARTITION extends ShuffleOrigin + +// Indicates that the shuffle operator was added by the user-specified repartition operator with +// a certain partition number. Spark can't optimize it. +case object REPARTITION_WITH_NUM extends ShuffleOrigin + /** * Performs a shuffle that will result in the desired partitioning. */ case class ShuffleExchangeExec( override val outputPartitioning: Partitioning, child: SparkPlan, - noUserSpecifiedNumPartition: Boolean = true) extends ShuffleExchangeLike { - - // If users specify the num partitions via APIs like `repartition`, we shouldn't change it. - // For `SinglePartition`, it requires exactly one partition and we can't change it either. - def canChangeNumPartitions: Boolean = - noUserSpecifiedNumPartition && outputPartitioning != SinglePartition + shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS) + extends ShuffleExchangeLike { private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index d13649354ae4..86c005bca712 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 23 +-- Number of queries: 24 -- !query @@ -89,7 +89,7 @@ Results [2]: [key#x, max#x] (5) Exchange Input [2]: [key#x, max#x] -Arguments: hashpartitioning(key#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, 4), ENSURE_REQUIREMENTS, [id=#x] (6) HashAggregate Input [2]: [key#x, max#x] @@ -100,7 +100,7 @@ Results [2]: [key#x, max(val#x)#x AS max(val)#x] (7) Exchange Input [2]: [key#x, max(val)#x] -Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x] +Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [id=#x] (8) Sort Input [2]: [key#x, max(val)#x] @@ -158,7 +158,7 @@ Results [2]: [key#x, max#x] (5) Exchange Input [2]: [key#x, max#x] -Arguments: hashpartitioning(key#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, 4), ENSURE_REQUIREMENTS, [id=#x] (6) HashAggregate Input [2]: [key#x, max#x] @@ -245,7 +245,7 @@ Results [2]: [key#x, val#x] (9) Exchange Input [2]: [key#x, val#x] -Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, val#x, 4), ENSURE_REQUIREMENTS, [id=#x] (10) HashAggregate Input [2]: [key#x, val#x] @@ -613,7 +613,7 @@ Results [2]: [key#x, max#x] (5) Exchange Input [2]: [key#x, max#x] -Arguments: hashpartitioning(key#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, 4), ENSURE_REQUIREMENTS, [id=#x] (6) HashAggregate Input [2]: [key#x, max#x] @@ -647,7 +647,7 @@ Results [2]: [key#x, max#x] (11) Exchange Input [2]: [key#x, max#x] -Arguments: hashpartitioning(key#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, 4), ENSURE_REQUIREMENTS, [id=#x] (12) HashAggregate Input [2]: [key#x, max#x] @@ -730,7 +730,7 @@ Results [3]: [count#xL, sum#xL, count#xL] (3) Exchange Input [3]: [count#xL, sum#xL, count#xL] -Arguments: SinglePartition, true, [id=#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] (4) HashAggregate Input [3]: [count#xL, sum#xL, count#xL] @@ -776,7 +776,7 @@ Results [2]: [key#x, buf#x] (3) Exchange Input [2]: [key#x, buf#x] -Arguments: hashpartitioning(key#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, 4), ENSURE_REQUIREMENTS, [id=#x] (4) ObjectHashAggregate Input [2]: [key#x, buf#x] @@ -828,7 +828,7 @@ Results [2]: [key#x, min#x] (4) Exchange Input [2]: [key#x, min#x] -Arguments: hashpartitioning(key#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, 4), ENSURE_REQUIREMENTS, [id=#x] (5) Sort Input [2]: [key#x, min#x] diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 4c5b8407bdf2..e7eb99abbccd 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 23 +-- Number of queries: 24 -- !query @@ -92,7 +92,7 @@ Results [2]: [key#x, max#x] (6) Exchange Input [2]: [key#x, max#x] -Arguments: hashpartitioning(key#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, 4), ENSURE_REQUIREMENTS, [id=#x] (7) HashAggregate [codegen id : 2] Input [2]: [key#x, max#x] @@ -103,7 +103,7 @@ Results [2]: [key#x, max(val#x)#x AS max(val)#x] (8) Exchange Input [2]: [key#x, max(val)#x] -Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x] +Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), ENSURE_REQUIREMENTS, [id=#x] (9) Sort [codegen id : 3] Input [2]: [key#x, max(val)#x] @@ -160,7 +160,7 @@ Results [2]: [key#x, max#x] (6) Exchange Input [2]: [key#x, max#x] -Arguments: hashpartitioning(key#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, 4), ENSURE_REQUIREMENTS, [id=#x] (7) HashAggregate [codegen id : 2] Input [2]: [key#x, max#x] @@ -250,7 +250,7 @@ Results [2]: [key#x, val#x] (11) Exchange Input [2]: [key#x, val#x] -Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, val#x, 4), ENSURE_REQUIREMENTS, [id=#x] (12) HashAggregate [codegen id : 4] Input [2]: [key#x, val#x] @@ -469,7 +469,7 @@ Results [1]: [max#x] (10) Exchange Input [1]: [max#x] -Arguments: SinglePartition, true, [id=#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] (11) HashAggregate [codegen id : 2] Input [1]: [max#x] @@ -516,7 +516,7 @@ Results [1]: [max#x] (17) Exchange Input [1]: [max#x] -Arguments: SinglePartition, true, [id=#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] (18) HashAggregate [codegen id : 2] Input [1]: [max#x] @@ -600,7 +600,7 @@ Results [1]: [max#x] (9) Exchange Input [1]: [max#x] -Arguments: SinglePartition, true, [id=#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] (10) HashAggregate [codegen id : 2] Input [1]: [max#x] @@ -647,7 +647,7 @@ Results [2]: [sum#x, count#xL] (16) Exchange Input [2]: [sum#x, count#xL] -Arguments: SinglePartition, true, [id=#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] (17) HashAggregate [codegen id : 2] Input [2]: [sum#x, count#xL] @@ -713,7 +713,7 @@ Results [2]: [sum#x, count#xL] (7) Exchange Input [2]: [sum#x, count#xL] -Arguments: SinglePartition, true, [id=#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] (8) HashAggregate [codegen id : 2] Input [2]: [sum#x, count#xL] @@ -851,7 +851,7 @@ Results [2]: [key#x, max#x] (6) Exchange Input [2]: [key#x, max#x] -Arguments: hashpartitioning(key#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, 4), ENSURE_REQUIREMENTS, [id=#x] (7) HashAggregate [codegen id : 4] Input [2]: [key#x, max#x] @@ -943,7 +943,7 @@ Results [3]: [count#xL, sum#xL, count#xL] (4) Exchange Input [3]: [count#xL, sum#xL, count#xL] -Arguments: SinglePartition, true, [id=#x] +Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#x] (5) HashAggregate [codegen id : 2] Input [3]: [count#xL, sum#xL, count#xL] @@ -988,7 +988,7 @@ Results [2]: [key#x, buf#x] (4) Exchange Input [2]: [key#x, buf#x] -Arguments: hashpartitioning(key#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, 4), ENSURE_REQUIREMENTS, [id=#x] (5) ObjectHashAggregate Input [2]: [key#x, buf#x] @@ -1039,7 +1039,7 @@ Results [2]: [key#x, min#x] (5) Exchange Input [2]: [key#x, min#x] -Arguments: hashpartitioning(key#x, 4), true, [id=#x] +Arguments: hashpartitioning(key#x, 4), ENSURE_REQUIREMENTS, [id=#x] (6) Sort [codegen id : 2] Input [2]: [key#x, min#x] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index e5e8bc691779..8f5547e41ae1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.COLUMN_BATCH_SIZE @@ -763,7 +763,9 @@ case class PreRuleReplaceAddWithBrokenVersion() extends Rule[SparkPlan] { case class MyShuffleExchangeExec(delegate: ShuffleExchangeExec) extends ShuffleExchangeLike { override def numMappers: Int = delegate.numMappers override def numPartitions: Int = delegate.numPartitions - override def canChangeNumPartitions: Boolean = delegate.canChangeNumPartitions + override def shuffleOrigin: ShuffleOrigin = { + delegate.shuffleOrigin + } override def mapOutputStatisticsFuture: Future[MapOutputStatistics] = delegate.mapOutputStatisticsFuture override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[_] = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index c9992e484e67..cc8611ba4060 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1013,4 +1013,14 @@ class AdaptiveQueryExecSuite } } } + + test("SPARK-33494: Do not use local shuffle reader for repartition") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + val df = spark.table("testData").repartition('key) + df.collect() + // local shuffle reader breaks partitioning and shouldn't be used for repartition operation + // which is specified by users. + checkNumLocalShuffleReaders(df.queryExecution.executedPlan, numShufflesWithoutLocalReader = 1) + } + } }