Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,13 @@ private[spark] object SQLConf {
"otherwise the schema is picked from the summary file or a random data file " +
"if no summary file is available.")

val PARQUET_SCHEMA_RESPECT_SUMMARIES = booleanConf("spark.sql.parquet.respectSummaryFiles",
defaultValue = Some(false),
doc = "When true, we make assumption that all part-files of Parquet are consistent with " +
"summary files and we will ignore them when merging schema. Otherwise, if this is " +
"false, which is the default, we will merge all part-files. This should be considered " +
"as expert-only option, and shouldn't be enabled before knowing what it means exactly.")

val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
defaultValue = Some(false),
doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ private[sql] class ParquetRelation(
.map(_.toBoolean)
.getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))

private val mergeRespectSummaries =
sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)

private val maybeMetastoreSchema = parameters
.get(ParquetRelation.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])
Expand Down Expand Up @@ -421,7 +424,21 @@ private[sql] class ParquetRelation(
val filesToTouch =
if (shouldMergeSchemas) {
// Also includes summary files, 'cause there might be empty partition directories.
(metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq

// If mergeRespectSummaries config is true, we assume that all part-files are the same for
// their schema with summary files, so we ignore them when merging schema.
// If the config is disabled, which is the default setting, we merge all part-files.
// In this mode, we only need to merge schemas contained in all those summary files.
// You should enable this configuration only if you are very sure that for the parquet
// part-files to read there are corresponding summary files containing correct schema.

val needMerged: Seq[FileStatus] =
if (mergeRespectSummaries) {
Seq()
} else {
dataStatuses
}
(metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq
} else {
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
// don't have this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.parquet

import java.io.File

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
import org.apache.spark.util.Utils

/**
* A test suite that tests various Parquet queries.
Expand Down Expand Up @@ -123,6 +126,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest {
}
}

test("Enabling/disabling merging partfiles when merging parquet schema") {
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString)
// delete summary files, so if we don't merge part-files, one column will not be included.
Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata"))
Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata"))
assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber)
}
}

withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") {
testSchemaMerging(2)
}

withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") {
testSchemaMerging(3)
}
}

test("Enabling/disabling schema merging") {
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
withTempDir { dir =>
Expand Down