Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ private[spark] object SQLConf {
"otherwise the schema is picked from the summary file or a random data file " +
"if no summary file is available.")

val PARQUET_SCHEMA_SKIP_MERGE_PARTFILES = booleanConf("spark.sql.parquet.skipMergePartFiles",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose renaming this configuration to spark.sql.parquet.respectSummaryFiles.

defaultValue = Some(false),
doc = "When true, we make assumption that all part-files of Parquet are consistent with " +
"summary files and we will ignore them when merging schema. Otherwise, if this is " +
"false, which is the default, we will merge all part-files.")

val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
defaultValue = Some(false),
doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ private[sql] class ParquetRelation2(
.map(_.toBoolean)
.getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))

private val skipMergePartFiles =
sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_SKIP_MERGE_PARTFILES)

private val maybeMetastoreSchema = parameters
.get(ParquetRelation2.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])
Expand Down Expand Up @@ -407,7 +410,17 @@ private[sql] class ParquetRelation2(
val filesToTouch =
if (shouldMergeSchemas) {
// Also includes summary files, 'cause there might be empty partition directories.
(metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq

// If skipMergePartFiles config is true, we assume that all part-files are the same for
// their schema with summary files, so we ignore them when merging schema.
// If the config is false, which is the default setting, we merge all part-files.
val needMerged: Seq[FileStatus] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Edited)

So, if I understand this correctly, skipMergePartFiles is useful when dealing with partitioned tables, where each partition directory may contain its own summary files, namely something like this:

.
├── p=1
│   ├── _metadata
│   ├── _common_metadata
│   └── ...
└── p=2
    ├── _common_metadata
    ├── _metadata
    └── ...

Basically in this mode, we only need to merge schemas contained in all those summary files. And for non-partitioned tables, skipMergePartFiles essentially disables shouldMergeSchemas, is it correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please update the comment to reflect the above fact (if that's correct)? It took me a while to realize this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After double thinking about this, I feel that this configuration can be pretty dangerous... Actually it's not unusual that various tools/systems fail to write Parquet summary files after writing actual Parquet data. There are at least two common cases:

  • Hive uses NullOutputCommitter, which bypasses ParquetOutputCommitter.commitJob(), where summary files are written. So Parquet tables written by Hive never have summary files. It's OK for Hive since Hive always store the schema in metastore and doesn't take schema evolution into account at all.
  • When appending Parquet data to some directory containing existing data, if newly written files have different user defined metadata from the old one (different values are assigned to the same key), Parquet simply gives up writing summary files (see 1, 2, and PARQUET-194).

So, it's quite possible that we have multiple partition directories, and some of them don't have summary files. With this flag enabled, part-files in those directories are completely ignored. Actually this is exactly the same situation illustrated in the test case you added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, this feature is still pretty appealing. Can we find out all partition directories containing no summary files and always merge part-files within these directories?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liancheng You are right. This is a very dangerous configuration. So it should be disabled by default and we are very sure that we have Parquet summary files along with Parquet data before we enable this to use for better performance.

I was not think about to find out part-files with no summary files along with and merge them. But it can be useful to deal with the situations you mentioned, although it should downgrade the performance a bit.

I will try to add this feature and update this PR later.

if (skipMergePartFiles) {
Seq()
} else {
dataStatuses
}
(metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary files are not always available. It's quite possible that metadataStatuses and commonMetadataStatuses are both empty.

} else {
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
// don't have this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

package org.apache.spark.sql.parquet

import java.io.File

import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
import org.apache.spark.util.Utils

/**
* A test suite that tests various Parquet queries.
Expand Down Expand Up @@ -124,6 +127,30 @@ class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
}
}

test("Enabling/disabling merging partfiles when merging parquet schema") {
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString)
// delete summary files, so if we don't merge part-files, one column will not be included.
Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata"))
Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata"))
assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber)
}
}

withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_SKIP_MERGE_PARTFILES.key -> "true") {
testSchemaMerging(2)
}

withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_SKIP_MERGE_PARTFILES.key -> "false") {
testSchemaMerging(3)
}
}

test("Enabling/disabling schema merging") {
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
withTempDir { dir =>
Expand Down