Skip to content

Commit 1d1eacd

Browse files
wangyumcloud-fan
authored andcommitted
[SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled
### What changes were proposed in this pull request? This PR makes `repartition`/`DISTRIBUTE BY` obeys [initialPartitionNum](https://github.com/apache/spark/blob/af4248b2d661d04fec89b37857a47713246d9465/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L446-L455) when adaptive execution enabled. ### Why are the changes needed? To make `DISTRIBUTE BY`/`GROUP BY` partitioned by same partition number. How to reproduce: ```scala spark.sql("CREATE TABLE spark_31220(id int)") spark.sql("set spark.sql.adaptive.enabled=true") spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000") ``` Before this PR: ``` scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- HashAggregate(keys=[id#5], functions=[]) +- Exchange hashpartitioning(id#5, 1000), true, [id=#171] +- HashAggregate(keys=[id#5], functions=[]) +- FileScan parquet default.spark_31220[id#5] scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Exchange hashpartitioning(id#5, 200), false, [id=#179] +- FileScan parquet default.spark_31220[id#5] ``` After this PR: ``` scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- HashAggregate(keys=[id#5], functions=[]) +- Exchange hashpartitioning(id#5, 1000), true, [id=#171] +- HashAggregate(keys=[id#5], functions=[]) +- FileScan parquet default.spark_31220[id#5] scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain == Physical Plan == AdaptiveSparkPlan(isFinalPlan=false) +- Exchange hashpartitioning(id#5, 1000), false, [id=#179] +- FileScan parquet default.spark_31220[id#5] ``` ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Unit test. Closes #27986 from wangyum/SPARK-31220. Authored-by: Yuming Wang <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 717ec5e commit 1d1eacd

File tree

3 files changed

+27
-12
lines changed

3 files changed

+27
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2784,7 +2784,15 @@ class SQLConf extends Serializable with Logging {
27842784

27852785
def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)
27862786

2787-
def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
2787+
def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
2788+
2789+
def numShufflePartitions: Int = {
2790+
if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
2791+
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)
2792+
} else {
2793+
defaultNumShufflePartitions
2794+
}
2795+
}
27882796

27892797
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
27902798

@@ -2797,9 +2805,6 @@ class SQLConf extends Serializable with Logging {
27972805

27982806
def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED)
27992807

2800-
def initialShufflePartitionNum: Int =
2801-
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions)
2802-
28032808
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
28042809

28052810
def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,6 @@ import org.apache.spark.sql.internal.SQLConf
3535
* the input partition ordering requirements are met.
3636
*/
3737
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
38-
private def defaultNumPreShufflePartitions: Int =
39-
if (conf.adaptiveExecutionEnabled && conf.coalesceShufflePartitionsEnabled) {
40-
conf.initialShufflePartitionNum
41-
} else {
42-
conf.numShufflePartitions
43-
}
4438

4539
private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
4640
val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
@@ -57,7 +51,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
5751
BroadcastExchangeExec(mode, child)
5852
case (child, distribution) =>
5953
val numPartitions = distribution.requiredNumPartitions
60-
.getOrElse(defaultNumPreShufflePartitions)
54+
.getOrElse(conf.numShufflePartitions)
6155
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
6256
}
6357

@@ -95,7 +89,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
9589
// expected number of shuffle partitions. However, if it's smaller than
9690
// `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
9791
// expected number of shuffle partitions.
98-
math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions)
92+
math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions)
9993
} else {
10094
childrenNumPartitions.max
10195
}

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,4 +1021,20 @@ class AdaptiveQueryExecSuite
10211021
}
10221022
}
10231023
}
1024+
1025+
test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") {
1026+
Seq(true, false).foreach { enableAQE =>
1027+
withSQLConf(
1028+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
1029+
SQLConf.SHUFFLE_PARTITIONS.key -> "6",
1030+
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
1031+
val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length
1032+
if (enableAQE) {
1033+
assert(partitionsNum === 7)
1034+
} else {
1035+
assert(partitionsNum === 6)
1036+
}
1037+
}
1038+
}
1039+
}
10241040
}

0 commit comments

Comments
 (0)