Skip to content

Commit 6175d6c

Browse files
viiryaliancheng
authored andcommitted
[SPARK-8838] [SQL] Add config to enable/disable merging part-files when merging parquet schema
JIRA: https://issues.apache.org/jira/browse/SPARK-8838 Currently all part-files are merged when merging parquet schema. However, in case there are many part-files and we can make sure that all the part-files have the same schema as their summary file. If so, we provide a configuration to disable merging part-files when merging parquet schema. In short, we need to merge parquet schema because different summary files may contain different schema. But the part-files are confirmed to have the same schema with summary files. Author: Liang-Chi Hsieh <[email protected]> Closes apache#7238 from viirya/option_partfile_merge and squashes the following commits: 71d5b5f [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge 8816f44 [Liang-Chi Hsieh] For comments. dbc8e6b [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge afc2fa1 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge d4ed7e6 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge df43027 [Liang-Chi Hsieh] Get dataStatuses' partitions based on all paths. 4eb2f00 [Liang-Chi Hsieh] Use given parameter. ea8f6e5 [Liang-Chi Hsieh] Correct the code comments. a57be0e [Liang-Chi Hsieh] Merge part-files if there are no summary files. 47df981 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge 4caf293 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge 0e734e0 [Liang-Chi Hsieh] Use correct API. 3b6be5b [Liang-Chi Hsieh] Fix key not found. 4bdd7e0 [Liang-Chi Hsieh] Don't read footer files if we can skip them. 8bbebcb [Liang-Chi Hsieh] Figure out how to test the config. bbd4ce7 [Liang-Chi Hsieh] Add config to enable/disable merging part-files when merging parquet schema.
1 parent 5ba2d44 commit 6175d6c

File tree

3 files changed

+52
-1
lines changed

3 files changed

+52
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,13 @@ private[spark] object SQLConf {
247247
"otherwise the schema is picked from the summary file or a random data file " +
248248
"if no summary file is available.")
249249

250+
val PARQUET_SCHEMA_RESPECT_SUMMARIES = booleanConf("spark.sql.parquet.respectSummaryFiles",
251+
defaultValue = Some(false),
252+
doc = "When true, we make assumption that all part-files of Parquet are consistent with " +
253+
"summary files and we will ignore them when merging schema. Otherwise, if this is " +
254+
"false, which is the default, we will merge all part-files. This should be considered " +
255+
"as expert-only option, and shouldn't be enabled before knowing what it means exactly.")
256+
250257
val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
251258
defaultValue = Some(false),
252259
doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ private[sql] class ParquetRelation(
124124
.map(_.toBoolean)
125125
.getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
126126

127+
private val mergeRespectSummaries =
128+
sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
129+
127130
private val maybeMetastoreSchema = parameters
128131
.get(ParquetRelation.METASTORE_SCHEMA)
129132
.map(DataType.fromJson(_).asInstanceOf[StructType])
@@ -421,7 +424,21 @@ private[sql] class ParquetRelation(
421424
val filesToTouch =
422425
if (shouldMergeSchemas) {
423426
// Also includes summary files, 'cause there might be empty partition directories.
424-
(metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
427+
428+
// If mergeRespectSummaries config is true, we assume that all part-files are the same for
429+
// their schema with summary files, so we ignore them when merging schema.
430+
// If the config is disabled, which is the default setting, we merge all part-files.
431+
// In this mode, we only need to merge schemas contained in all those summary files.
432+
// You should enable this configuration only if you are very sure that for the parquet
433+
// part-files to read there are corresponding summary files containing correct schema.
434+
435+
val needMerged: Seq[FileStatus] =
436+
if (mergeRespectSummaries) {
437+
Seq()
438+
} else {
439+
dataStatuses
440+
}
441+
(metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq
425442
} else {
426443
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
427444
// don't have this.

sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
package org.apache.spark.sql.parquet
1919

20+
import java.io.File
21+
2022
import org.apache.hadoop.fs.Path
2123

2224
import org.apache.spark.sql.types._
2325
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
26+
import org.apache.spark.util.Utils
2427

2528
/**
2629
* A test suite that tests various Parquet queries.
@@ -123,6 +126,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest {
123126
}
124127
}
125128

129+
test("Enabling/disabling merging partfiles when merging parquet schema") {
130+
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
131+
withTempDir { dir =>
132+
val basePath = dir.getCanonicalPath
133+
sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
134+
sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString)
135+
// delete summary files, so if we don't merge part-files, one column will not be included.
136+
Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata"))
137+
Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata"))
138+
assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber)
139+
}
140+
}
141+
142+
withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
143+
SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") {
144+
testSchemaMerging(2)
145+
}
146+
147+
withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
148+
SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") {
149+
testSchemaMerging(3)
150+
}
151+
}
152+
126153
test("Enabling/disabling schema merging") {
127154
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
128155
withTempDir { dir =>

0 commit comments

Comments
 (0)