From 8ef9f29c281073d07d18ef6a539da00f28aa8bd9 Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 10:14:55 +0800 Subject: [PATCH 01/10] init --- .../org/apache/spark/sql/internal/SQLConf.scala | 10 ++++++++++ .../sql/execution/datasources/FilePartition.scala | 5 +++-- .../datasources/FileSourceStrategySuite.scala | 12 ++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7f63d79a21ed6..a5a8c0bcb0412 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1176,6 +1176,14 @@ object SQLConf { .longConf .createWithDefault(4 * 1024 * 1024) + val FILES_MIN_PARTITION_NUM = buildConf("spark.sql.files.minPartitionNum") + .doc("The suggested (not guaranteed) minimum number of file split partitions. If not set, " + + "the default value is the default parallelism of the Spark cluster. This configuration is " + + "effective only when using file-based sources such as Parquet, JSON and ORC.") + .version("3.1.0") + .intConf + .createOptional + val IGNORE_CORRUPT_FILES = buildConf("spark.sql.files.ignoreCorruptFiles") .doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " + "encountering corrupted files and the contents that have been read will still be returned. " + @@ -2782,6 +2790,8 @@ class SQLConf extends Serializable with Logging { def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES) + def filesMinPartitionNum: Option[Int] = getConf(FILES_MIN_PARTITION_NUM) + def ignoreCorruptFiles: Boolean = getConf(IGNORE_CORRUPT_FILES) def ignoreMissingFiles: Boolean = getConf(IGNORE_MISSING_FILES) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala index b4fc94e097aa8..095940772ae78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FilePartition.scala @@ -88,9 +88,10 @@ object FilePartition extends Logging { selectedPartitions: Seq[PartitionDirectory]): Long = { val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes - val defaultParallelism = sparkSession.sparkContext.defaultParallelism + val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum + .getOrElse(sparkSession.sparkContext.defaultParallelism) val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum - val bytesPerCore = totalBytes / defaultParallelism + val bytesPerCore = totalBytes / minPartitionNum Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 812305ba24403..7d1e9ed1f8bf9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -526,6 +526,18 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre assert(scan.head.scan.readSchema() == StructType(StructField("key", LongType) :: Nil)) } } + + test("Add spark.sql.files.minPartitionNum config") { + withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "1") { + val table = + createTable(files = Seq( + "file1" -> 1, + "file2" -> 1, + "file3" -> 1, + )) + assert(table.rdd.partitions.length == 1) + } + } } // Helpers for checking the arguments passed to the FileFormat. From 4d776d3957be629822b61bcff7741e97aa681bcd Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 10:42:24 +0800 Subject: [PATCH 02/10] fix --- .../datasources/FileSourceStrategySuite.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 7d1e9ed1f8bf9..83df5c187a123 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -526,17 +526,17 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre assert(scan.head.scan.readSchema() == StructType(StructField("key", LongType) :: Nil)) } } + } - test("Add spark.sql.files.minPartitionNum config") { - withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "1") { - val table = - createTable(files = Seq( - "file1" -> 1, - "file2" -> 1, - "file3" -> 1, - )) - assert(table.rdd.partitions.length == 1) - } + test("Add spark.sql.files.minPartitionNum config") { + withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "1") { + val table = + createTable(files = Seq( + "file1" -> 1, + "file2" -> 1, + "file3" -> 1, + )) + assert(table.rdd.partitions.length == 1) } } From 09bba5cb67be22aff62956e22c1f999de1ccf547 Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 11:18:50 +0800 Subject: [PATCH 03/10] fix --- .../sql/execution/datasources/FileSourceStrategySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 83df5c187a123..48505c9455894 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -534,7 +534,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre createTable(files = Seq( "file1" -> 1, "file2" -> 1, - "file3" -> 1, + "file3" -> 1 )) assert(table.rdd.partitions.length == 1) } From 8f9b1c121905972343ae0e313ee706bfa8aef2d7 Mon Sep 17 00:00:00 2001 From: ulysses Date: Thu, 18 Jun 2020 16:16:13 +0800 Subject: [PATCH 04/10] add check --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a5a8c0bcb0412..c9e1a87b63da8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1182,6 +1182,7 @@ object SQLConf { "effective only when using file-based sources such as Parquet, JSON and ORC.") .version("3.1.0") .intConf + .checkValue(v => v > 0, "The min partition number must be a positive integer.") .createOptional val IGNORE_CORRUPT_FILES = buildConf("spark.sql.files.ignoreCorruptFiles") From 7c667d8e6167a02d4608414b6df9f1897b2123cd Mon Sep 17 00:00:00 2001 From: ulysses Date: Fri, 19 Jun 2020 07:46:37 +0800 Subject: [PATCH 05/10] add ut --- .../datasources/FileSourceStrategySuite.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 48505c9455894..c7f847956838b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -538,6 +538,16 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre )) assert(table.rdd.partitions.length == 1) } + + withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "10") { + val table = + createTable(files = Seq( + "file1" -> 1, + "file2" -> 1, + "file3" -> 1 + )) + assert(table.rdd.partitions.length == 3) + } } // Helpers for checking the arguments passed to the FileFormat. From 600b933e209e60cebc9aaa91e0d4581c3f57c402 Mon Sep 17 00:00:00 2001 From: ulysses Date: Fri, 19 Jun 2020 09:56:52 +0800 Subject: [PATCH 06/10] update doc --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c9e1a87b63da8..b61324ff98ae5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1177,8 +1177,8 @@ object SQLConf { .createWithDefault(4 * 1024 * 1024) val FILES_MIN_PARTITION_NUM = buildConf("spark.sql.files.minPartitionNum") - .doc("The suggested (not guaranteed) minimum number of file split partitions. If not set, " + - "the default value is the default parallelism of the Spark cluster. This configuration is " + + .doc("The suggested (not guaranteed) minimum number of splitting file partitions. " + + "If not set, the default value is `spark.default.parallelism`. This configuration is " + "effective only when using file-based sources such as Parquet, JSON and ORC.") .version("3.1.0") .intConf From f6d574a9a6093fbd5bd2f9c90c64fbd546e6a247 Mon Sep 17 00:00:00 2001 From: ulysses Date: Fri, 19 Jun 2020 17:31:28 +0800 Subject: [PATCH 07/10] add ut --- .../datasources/FileSourceStrategySuite.scala | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index c7f847956838b..a5261d5802aa4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -528,7 +528,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre } } - test("Add spark.sql.files.minPartitionNum config") { + test("SPARK-32019: Add spark.sql.files.minPartitionNum config") { withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "1") { val table = createTable(files = Seq( @@ -548,6 +548,18 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre )) assert(table.rdd.partitions.length == 3) } + + withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "16") { + val partitions = (1 to 100).map(i => s"file$i" -> 128*1024*1024) + val table = createTable(files = partitions) + assert(table.rdd.partitions.length == 16) + } + + withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "32") { + val partitions = (1 to 500).map(i => s"file$i" -> 4*1024*1024) + val table = createTable(files = partitions) + assert(table.rdd.partitions.length == 32) + } } // Helpers for checking the arguments passed to the FileFormat. From 8f86b42ca745fe755a460eadb23485ccced90094 Mon Sep 17 00:00:00 2001 From: ulysses Date: Fri, 19 Jun 2020 19:21:22 +0800 Subject: [PATCH 08/10] fix --- .../sql/execution/datasources/FileSourceStrategySuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index a5261d5802aa4..5957b4188f5af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -552,11 +552,12 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "16") { val partitions = (1 to 100).map(i => s"file$i" -> 128*1024*1024) val table = createTable(files = partitions) - assert(table.rdd.partitions.length == 16) + // partition is limit by filesMaxPartitionBytes(128MB) + assert(table.rdd.partitions.length == 100) } withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "32") { - val partitions = (1 to 500).map(i => s"file$i" -> 4*1024*1024) + val partitions = (1 to 800).map(i => s"file$i" -> 4*1024*1024) val table = createTable(files = partitions) assert(table.rdd.partitions.length == 32) } From 3a646a254690dce8e1c99dfd409066230a3a8c8a Mon Sep 17 00:00:00 2001 From: ulysses Date: Fri, 19 Jun 2020 23:37:44 +0800 Subject: [PATCH 09/10] fix --- .../sql/execution/datasources/FileSourceStrategySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 5957b4188f5af..5a8052e932b8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -559,7 +559,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "32") { val partitions = (1 to 800).map(i => s"file$i" -> 4*1024*1024) val table = createTable(files = partitions) - assert(table.rdd.partitions.length == 32) + assert(table.rdd.partitions.length == 50) } } From 1fb9dc651d5e1041fef8612bdbd3299dcea494a5 Mon Sep 17 00:00:00 2001 From: ulysses Date: Sat, 20 Jun 2020 11:56:05 +0800 Subject: [PATCH 10/10] fix --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../sql/execution/datasources/FileSourceStrategySuite.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b61324ff98ae5..4388f6e12671a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1177,7 +1177,7 @@ object SQLConf { .createWithDefault(4 * 1024 * 1024) val FILES_MIN_PARTITION_NUM = buildConf("spark.sql.files.minPartitionNum") - .doc("The suggested (not guaranteed) minimum number of splitting file partitions. " + + .doc("The suggested (not guaranteed) minimum number of split file partitions. " + "If not set, the default value is `spark.default.parallelism`. This configuration is " + "effective only when using file-based sources such as Parquet, JSON and ORC.") .version("3.1.0") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 5a8052e932b8e..8a6e6b5ee801d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -550,14 +550,14 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre } withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "16") { - val partitions = (1 to 100).map(i => s"file$i" -> 128*1024*1024) + val partitions = (1 to 100).map(i => s"file$i" -> 128 * 1024 * 1024) val table = createTable(files = partitions) - // partition is limit by filesMaxPartitionBytes(128MB) + // partition is limited by filesMaxPartitionBytes(128MB) assert(table.rdd.partitions.length == 100) } withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "32") { - val partitions = (1 to 800).map(i => s"file$i" -> 4*1024*1024) + val partitions = (1 to 800).map(i => s"file$i" -> 4 * 1024 * 1024) val table = createTable(files = partitions) assert(table.rdd.partitions.length == 50) }