Skip to content

Commit 40c37d6

Browse files
committed
[SPARK-33617][SQL][FOLLOWUP] refine the default parallelism SQL config
### What changes were proposed in this pull request? This is a followup of #30559 . The default parallelism config in Spark core is not good, as it's unclear where it applies. To not inherit this problem in Spark SQL, this PR refines the default parallelism SQL config, to make it clear that it only applies to leaf nodes. ### Why are the changes needed? Make the config clearer. ### Does this PR introduce _any_ user-facing change? It changes an unreleased config. ### How was this patch tested? existing tests Closes #30736 from cloud-fan/follow. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 23083aa commit 40c37d6

File tree

9 files changed

+19
-23
lines changed

9 files changed

+19
-23
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -374,12 +374,13 @@ object SQLConf {
374374
.booleanConf
375375
.createWithDefault(true)
376376

377-
val DEFAULT_PARALLELISM = buildConf("spark.sql.default.parallelism")
378-
.doc("The number of parallelism for Spark SQL, the default value is " +
379-
"`spark.default.parallelism`.")
377+
val LEAF_NODE_DEFAULT_PARALLELISM = buildConf("spark.sql.leafNodeDefaultParallelism")
378+
.doc("The default parallelism of Spark SQL leaf nodes that produce data, such as the file " +
379+
"scan node, the local data scan node, the range node, etc. The default value of this " +
380+
"config is 'SparkContext#defaultParallelism'.")
380381
.version("3.2.0")
381382
.intConf
382-
.checkValue(_ > 0, "The value of spark.sql.default.parallelism must be positive.")
383+
.checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.")
383384
.createOptional
384385

385386
val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
@@ -3202,8 +3203,6 @@ class SQLConf extends Serializable with Logging {
32023203

32033204
def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)
32043205

3205-
def defaultParallelism: Option[Int] = getConf(DEFAULT_PARALLELISM)
3206-
32073206
def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
32083207

32093208
def numShufflePartitions: Int = {

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -523,8 +523,7 @@ class SparkSession private(
523523
* @since 2.0.0
524524
*/
525525
def range(start: Long, end: Long): Dataset[java.lang.Long] = {
526-
range(start, end, step = 1,
527-
numPartitions = sqlContext.conf.defaultParallelism.getOrElse(sparkContext.defaultParallelism))
526+
range(start, end, step = 1, numPartitions = leafNodeDefaultParallelism)
528527
}
529528

530529
/**
@@ -534,8 +533,7 @@ class SparkSession private(
534533
* @since 2.0.0
535534
*/
536535
def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = {
537-
range(start, end, step,
538-
numPartitions = sqlContext.conf.defaultParallelism.getOrElse(sparkContext.defaultParallelism))
536+
range(start, end, step, numPartitions = leafNodeDefaultParallelism)
539537
}
540538

541539
/**
@@ -775,6 +773,10 @@ class SparkSession private(
775773
SparkSession.setActiveSession(old)
776774
}
777775
}
776+
777+
private[sql] def leafNodeDefaultParallelism: Int = {
778+
conf.get(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM).getOrElse(sparkContext.defaultParallelism)
779+
}
778780
}
779781

780782

sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ case class LocalTableScanExec(
5050
sqlContext.sparkContext.emptyRDD
5151
} else {
5252
val numSlices = math.min(
53-
unsafeRows.length,
54-
conf.defaultParallelism.getOrElse(sqlContext.sparkContext.defaultParallelism))
53+
unsafeRows.length, sqlContext.sparkSession.leafNodeDefaultParallelism)
5554
sqlContext.sparkContext.parallelize(unsafeRows, numSlices)
5655
}
5756
}

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ case class CoalesceShufflePartitions(session: SparkSession) extends CustomShuffl
6767
// We fall back to Spark default parallelism if the minimum number of coalesced partitions
6868
// is not set, so to avoid perf regressions compared to no coalescing.
6969
val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
70-
.orElse(conf.defaultParallelism).getOrElse(session.sparkContext.defaultParallelism)
70+
.getOrElse(session.sparkContext.defaultParallelism)
7171
val partitionSpecs = ShufflePartitionsUtil.coalescePartitions(
7272
validMetrics.toArray,
7373
advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES),

sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
382382
val start: Long = range.start
383383
val end: Long = range.end
384384
val step: Long = range.step
385-
val numSlices: Int = range.numSlices.orElse(sqlContext.conf.defaultParallelism)
386-
.getOrElse(sparkContext.defaultParallelism)
385+
val numSlices: Int = range.numSlices.getOrElse(sqlContext.sparkSession.leafNodeDefaultParallelism)
387386
val numElements: BigInt = range.numElements
388387
val isEmptyRange: Boolean = start == end || (start < end ^ 0 < step)
389388

sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -738,8 +738,7 @@ case class AlterTableRecoverPartitionsCommand(
738738
// Set the number of parallelism to prevent following file listing from generating many tasks
739739
// in case of large #defaultParallelism.
740740
val numParallelism = Math.min(serializedPaths.length,
741-
Math.min(spark.sessionState.conf.defaultParallelism
742-
.getOrElse(spark.sparkContext.defaultParallelism), 10000))
741+
Math.min(spark.sparkContext.defaultParallelism, 10000))
743742
// gather the fast stats for all the partitions otherwise Hive metastore will list all the
744743
// files for all the new partitions in sequential way, which is super slow.
745744
logInfo(s"Gather the fast stats in parallel using $numParallelism tasks.")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,7 @@ object FilePartition extends Logging {
8989
val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
9090
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
9191
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
92-
.orElse(sparkSession.sessionState.conf.defaultParallelism)
93-
.getOrElse(sparkSession.sparkContext.defaultParallelism)
92+
.getOrElse(sparkSession.leafNodeDefaultParallelism)
9493
val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum
9594
val bytesPerCore = totalBytes / minPartitionNum
9695

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SchemaMergeUtils.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,7 @@ object SchemaMergeUtils extends Logging {
5757
// Set the number of partitions to prevent following schema reads from generating many tasks
5858
// in case of a small number of orc files.
5959
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
60-
sparkSession.sessionState.conf.defaultParallelism
61-
.getOrElse(sparkSession.sparkContext.defaultParallelism))
60+
sparkSession.sparkContext.defaultParallelism)
6261

6362
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
6463

sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
8989
assert(LocalTableScanExec(Nil, Nil).execute().getNumPartitions == 0)
9090
}
9191

92-
test("SPARK-33617: spark.sql.default.parallelism effective for LocalTableScan") {
92+
test("SPARK-33617: change default parallelism of LocalTableScan") {
9393
Seq(1, 4).foreach { minPartitionNum =>
94-
withSQLConf(SQLConf.DEFAULT_PARALLELISM.key -> minPartitionNum.toString) {
94+
withSQLConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key -> minPartitionNum.toString) {
9595
val df = spark.sql("SELECT * FROM VALUES (1), (2), (3), (4), (5), (6), (7), (8)")
9696
assert(df.rdd.partitions.length === minPartitionNum)
9797
}

0 commit comments

Comments
 (0)