Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@ private[spark] object SQLConf {
"otherwise the schema is picked from the summary file or a random data file " +
"if no summary file is available.")

val PARQUET_SCHEMA_RESPECT_SUMMARIES = booleanConf("spark.sql.parquet.respectSummaryFiles",
defaultValue = Some(false),
doc = "When true, we make assumption that all part-files of Parquet are consistent with " +
"summary files and we will ignore them when merging schema. Otherwise, if this is " +
"false, which is the default, we will merge all part-files.")

val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
defaultValue = Some(false),
doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.rdd.RDD._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{SqlNewHadoopPartition, SqlNewHadoopRDD}
import org.apache.spark.sql.execution.datasources.PartitionSpec
import org.apache.spark.sql.execution.datasources.{PartitionSpec, PartitioningUtils}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
Expand Down Expand Up @@ -125,6 +125,9 @@ private[sql] class ParquetRelation(
.map(_.toBoolean)
.getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))

private val mergeRespectSummaries =
sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)

private val maybeMetastoreSchema = parameters
.get(ParquetRelation.METASTORE_SCHEMA)
.map(DataType.fromJson(_).asInstanceOf[StructType])
Expand Down Expand Up @@ -330,6 +333,28 @@ private[sql] class ParquetRelation(
}
}

def filterDataStatusesWithoutSummaries(
leaves: Seq[FileStatus],
dataStatuses: Seq[FileStatus]): Seq[FileStatus] = {
// Get the paths that have summary files
val directoriesWithSummaries = leaves.map(_.getPath.getParent.toString).toSet

val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled()
val dataPaths = dataStatuses.map(_.getPath.getParent)
val dataPathsWithoutSummaries = PartitioningUtils.parsePartitions(dataPaths,
PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference).partitions
.map(_.path).filterNot(p => directoriesWithSummaries.contains(p)).toSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still problematic. For a non-partitioned table like this:

base/
  _metadata
  _common_data
  file-1
  file-2
  ...

parsePartitions always returns an empty PartitionSpec containing no Partitions, thus dataPathsWithoutSummaries is always empty, and we always merge all part-files, which is not expected behavior.

However, as what I suggested in the other comment, we can probably just remove this method.


if (dataPathsWithoutSummaries.size > 0) {
dataStatuses.filter { d =>
val path = d.getPath.getParent.toString
dataPathsWithoutSummaries.contains(path)
}
} else {
dataStatuses
}
}

private class MetadataCache {
// `FileStatus` objects of all "_metadata" files.
private var metadataStatuses: Array[FileStatus] = _
Expand Down Expand Up @@ -422,7 +447,30 @@ private[sql] class ParquetRelation(
val filesToTouch =
if (shouldMergeSchemas) {
// Also includes summary files, 'cause there might be empty partition directories.
(metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq

// If mergeRespectSummaries config is true, we assume that all part-files are the same for
// their schema with summary files, so we ignore them when merging schema.
// If the config is false, which is the default setting, we merge all part-files.

// mergeRespectSummaries is useful when dealing with partitioned tables, where each
// partition directory contains its own summary files.
// In this mode, we only need to merge schemas contained in all those summary files.
// For non-partitioned tables, or the partition directories that don't contain
// summary files, we still merge their part-files because it is possible their schemas
// are different with other summary files.
// You should enable this configuration only if you are very sure that all partition
// directories contain the summary files with consistent schema with its part-files.

val needMerged: Seq[FileStatus] =
if (mergeRespectSummaries) {
// If we want to merge parquet schema and only respect summary files,
// we still need to merge these part-files without summaries files.
filterDataStatusesWithoutSummaries(metadataStatuses ++
commonMetadataStatuses, dataStatuses)
} else {
dataStatuses
}
(metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq
} else {
// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
// don't have this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.parquet

import java.io.File

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.types._
import org.apache.spark.sql.{QueryTest, Row, SQLConf}
import org.apache.spark.util.Utils

/**
* A test suite that tests various Parquet queries.
Expand Down Expand Up @@ -123,6 +126,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest {
}
}

test("Enabling/disabling merging partfiles when merging parquet schema") {
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString)
sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString)
// delete summary files, we still merge part-files without summary files.
Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata"))
Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata"))
assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber)
}
}

withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") {
testSchemaMerging(3)
}

withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") {
testSchemaMerging(3)
}
}

test("Enabling/disabling schema merging") {
def testSchemaMerging(expectedColumnNumber: Int): Unit = {
withTempDir { dir =>
Expand Down