Skip to content

Commit

Permalink
[SPARK-2119][SQL] Improved Parquet performance when reading off S3
Browse files Browse the repository at this point in the history
JIRA issue: [SPARK-2119](https://issues.apache.org/jira/browse/SPARK-2119)

Essentially this PR fixed three issues to gain much better performance when reading large Parquet file off S3.

1. When reading the schema, fetching Parquet metadata from a part-file rather than the `_metadata` file

   The `_metadata` file contains metadata of all row groups, and can be very large if there are many row groups. Since schema information and row group metadata are coupled within a single Thrift object, we have to read the whole `_metadata` to fetch the schema. On the other hand, schema is replicated among footers of all part-files, which are fairly small.

1. Only add the root directory of the Parquet file rather than all the part-files to input paths

   HDFS API can automatically filter out all hidden files and underscore files (`_SUCCESS` & `_metadata`), there's no need to filter out all part-files and add them individually to input paths. What make it much worse is that, `FileInputFormat.listStatus()` calls `FileSystem.globStatus()` on each individual input path sequentially, each results a blocking remote S3 HTTP request.

1. Worked around [PARQUET-16](https://issues.apache.org/jira/browse/PARQUET-16)

   Essentially PARQUET-16 is similar to the above issue, and results lots of sequential `FileSystem.getFileStatus()` calls, which are further translated into a bunch of remote S3 HTTP requests.

   `FilteringParquetRowInputFormat` should be cleaned up once PARQUET-16 is fixed.

Below is the micro benchmark result. The dataset used is a S3 Parquet file consists of 3,793 partitions, about 110MB per partition in average. The benchmark is done with a 9-node AWS cluster.

- Creating a Parquet `SchemaRDD` (Parquet schema is fetched)

  ```scala
  val tweets = parquetFile(uri)
  ```

  - Before: 17.80s
  - After: 8.61s

- Fetching partition information

  ```scala
  tweets.getPartitions
  ```

  - Before: 700.87s
  - After: 21.47s

- Counting the whole file (both steps above are executed altogether)

  ```scala
  parquetFile(uri).count()
  ```

  - Before: ??? (haven't test yet)
  - After: 53.26s

Author: Cheng Lian <[email protected]>

Closes apache#1370 from liancheng/faster-parquet and squashes the following commits:

94a2821 [Cheng Lian] Added comments about schema consistency
d2c4417 [Cheng Lian] Worked around PARQUET-16 to improve Parquet performance
1c0d1b9 [Cheng Lian] Accelerated Parquet schema retrieving
5bd3d29 [Cheng Lian] Fixed Parquet log level
  • Loading branch information
liancheng authored and conviva-zz committed Sep 4, 2014
1 parent f42935d commit 0f9748c
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,34 @@

package org.apache.spark.sql.parquet

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.util.Try

import java.io.IOException
import java.lang.{Long => JLong}
import java.text.SimpleDateFormat
import java.util.Date
import java.util.{Date, List => JList}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat, FileOutputCommitter}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import parquet.hadoop.{ParquetRecordReader, ParquetInputFormat, ParquetOutputFormat}
import parquet.hadoop.api.ReadSupport
import parquet.hadoop._
import parquet.hadoop.api.{InitContext, ReadSupport}
import parquet.hadoop.metadata.GlobalMetaData
import parquet.hadoop.util.ContextUtil
import parquet.io.InvalidRecordException
import parquet.io.ParquetDecodingException
import parquet.schema.MessageType

import org.apache.spark.{Logging, SerializableWritable, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Row}
import org.apache.spark.sql.execution.{LeafNode, SparkPlan, UnaryNode}
import org.apache.spark.{Logging, SerializableWritable, TaskContext}

/**
* Parquet table scan operator. Imports the file that backs the given
Expand All @@ -55,16 +62,14 @@ case class ParquetTableScan(
override def execute(): RDD[Row] = {
val sc = sqlContext.sparkContext
val job = new Job(sc.hadoopConfiguration)
ParquetInputFormat.setReadSupportClass(
job,
classOf[org.apache.spark.sql.parquet.RowReadSupport])
ParquetInputFormat.setReadSupportClass(job, classOf[RowReadSupport])

val conf: Configuration = ContextUtil.getConfiguration(job)
val fileList = FileSystemHelper.listFiles(relation.path, conf)
// add all paths in the directory but skip "hidden" ones such
// as "_SUCCESS" and "_metadata"
for (path <- fileList if !path.getName.startsWith("_")) {
NewFileInputFormat.addInputPath(job, path)
val qualifiedPath = {
val path = new Path(relation.path)
path.getFileSystem(conf).makeQualified(path)
}
NewFileInputFormat.addInputPath(job, qualifiedPath)

// Store both requested and original schema in `Configuration`
conf.set(
Expand All @@ -87,7 +92,7 @@ case class ParquetTableScan(

sc.newAPIHadoopRDD(
conf,
classOf[org.apache.spark.sql.parquet.FilteringParquetRowInputFormat],
classOf[FilteringParquetRowInputFormat],
classOf[Void],
classOf[Row])
.map(_._2)
Expand Down Expand Up @@ -122,14 +127,7 @@ case class ParquetTableScan(
private def validateProjection(projection: Seq[Attribute]): Boolean = {
val original: MessageType = relation.parquetSchema
val candidate: MessageType = ParquetTypesConverter.convertFromAttributes(projection)
try {
original.checkContains(candidate)
true
} catch {
case e: InvalidRecordException => {
false
}
}
Try(original.checkContains(candidate)).isSuccess
}
}

Expand Down Expand Up @@ -302,6 +300,11 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
*/
private[parquet] class FilteringParquetRowInputFormat
extends parquet.hadoop.ParquetInputFormat[Row] with Logging {

private var footers: JList[Footer] = _

private var fileStatuses= Map.empty[Path, FileStatus]

override def createRecordReader(
inputSplit: InputSplit,
taskAttemptContext: TaskAttemptContext): RecordReader[Void, Row] = {
Expand All @@ -318,6 +321,70 @@ private[parquet] class FilteringParquetRowInputFormat
new ParquetRecordReader[Row](readSupport)
}
}

override def getFooters(jobContext: JobContext): JList[Footer] = {
if (footers eq null) {
val statuses = listStatus(jobContext)
fileStatuses = statuses.map(file => file.getPath -> file).toMap
footers = getFooters(ContextUtil.getConfiguration(jobContext), statuses)
}

footers
}

// TODO Remove this method and related code once PARQUET-16 is fixed
// This method together with the `getFooters` method and the `fileStatuses` field are just used
// to mimic this PR: https://github.com/apache/incubator-parquet-mr/pull/17
override def getSplits(
configuration: Configuration,
footers: JList[Footer]): JList[ParquetInputSplit] = {

val maxSplitSize: JLong = configuration.getLong("mapred.max.split.size", Long.MaxValue)
val minSplitSize: JLong =
Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L))
if (maxSplitSize < 0 || minSplitSize < 0) {
throw new ParquetDecodingException(
s"maxSplitSize or minSplitSie should not be negative: maxSplitSize = $maxSplitSize;" +
s" minSplitSize = $minSplitSize")
}

val getGlobalMetaData =
classOf[ParquetFileWriter].getDeclaredMethod("getGlobalMetaData", classOf[JList[Footer]])
getGlobalMetaData.setAccessible(true)
val globalMetaData = getGlobalMetaData.invoke(null, footers).asInstanceOf[GlobalMetaData]

val readContext = getReadSupport(configuration).init(
new InitContext(configuration,
globalMetaData.getKeyValueMetaData(),
globalMetaData.getSchema()))

val generateSplits =
classOf[ParquetInputFormat[_]].getDeclaredMethods.find(_.getName == "generateSplits").get
generateSplits.setAccessible(true)

val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
for (footer <- footers) {
val fs = footer.getFile.getFileSystem(configuration)
val file = footer.getFile
val fileStatus = fileStatuses.getOrElse(file, fs.getFileStatus(file))
val parquetMetaData = footer.getParquetMetadata
val blocks = parquetMetaData.getBlocks
val fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen)
splits.addAll(
generateSplits.invoke(
null,
blocks,
fileBlockLocations,
fileStatus,
parquetMetaData.getFileMetaData,
readContext.getRequestedSchema.toString,
readContext.getReadSupportMetadata,
minSplitSize,
maxSplitSize).asInstanceOf[JList[ParquetInputSplit]])
}

splits
}
}

private[parquet] object FileSystemHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@

package org.apache.spark.sql.parquet

import org.apache.hadoop.conf.Configuration
import java.util.{HashMap => JHashMap}

import org.apache.hadoop.conf.Configuration
import parquet.column.ParquetProperties
import parquet.hadoop.ParquetOutputFormat
import parquet.hadoop.api.ReadSupport.ReadContext
import parquet.hadoop.api.{ReadSupport, WriteSupport}
import parquet.io.api._
import parquet.schema.{MessageType, MessageTypeParser}
import parquet.schema.MessageType

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.{Attribute, Row}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.execution.SparkSqlSerializer
import com.google.common.io.BaseEncoding

/**
* A `parquet.io.api.RecordMaterializer` for Rows.
Expand Down Expand Up @@ -93,8 +92,8 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
configuration: Configuration,
keyValueMetaData: java.util.Map[String, String],
fileSchema: MessageType): ReadContext = {
var parquetSchema: MessageType = fileSchema
var metadata: java.util.Map[String, String] = new java.util.HashMap[String, String]()
var parquetSchema = fileSchema
val metadata = new JHashMap[String, String]()
val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)

if (requestedAttributes != null) {
Expand All @@ -109,7 +108,7 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr)
}

return new ReadSupport.ReadContext(parquetSchema, metadata)
new ReadSupport.ReadContext(parquetSchema, metadata)
}
}

Expand All @@ -132,13 +131,17 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
private[parquet] var attributes: Seq[Attribute] = null

override def init(configuration: Configuration): WriteSupport.WriteContext = {
attributes = if (attributes == null) RowWriteSupport.getSchema(configuration) else attributes

val origAttributesStr: String = configuration.get(RowWriteSupport.SPARK_ROW_SCHEMA)
val metadata = new JHashMap[String, String]()
metadata.put(RowReadSupport.SPARK_METADATA_KEY, origAttributesStr)

if (attributes == null) {
attributes = ParquetTypesConverter.convertFromString(origAttributesStr)
}

log.debug(s"write support initialized for requested schema $attributes")
ParquetRelation.enableLogForwarding()
new WriteSupport.WriteContext(
ParquetTypesConverter.convertFromAttributes(attributes),
new java.util.HashMap[java.lang.String, java.lang.String]())
new WriteSupport.WriteContext(ParquetTypesConverter.convertFromAttributes(attributes), metadata)
}

override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.IOException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter

import parquet.hadoop.{ParquetFileReader, Footer, ParquetFileWriter}
import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData}
Expand Down Expand Up @@ -367,20 +368,24 @@ private[parquet] object ParquetTypesConverter extends Logging {
s"Expected $path for be a directory with Parquet files/metadata")
}
ParquetRelation.enableLogForwarding()
val metadataPath = new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)
// if this is a new table that was just created we will find only the metadata file
if (fs.exists(metadataPath) && fs.isFile(metadataPath)) {
ParquetFileReader.readFooter(conf, metadataPath)
} else {
// there may be one or more Parquet files in the given directory
val footers = ParquetFileReader.readFooters(conf, fs.getFileStatus(path))
// TODO: for now we assume that all footers (if there is more than one) have identical
// metadata; we may want to add a check here at some point
if (footers.size() == 0) {
throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path")
}
footers(0).getParquetMetadata

val children = fs.listStatus(path).filterNot {
_.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME
}

// NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
// groups. Since Parquet schema is replicated among all row groups, we only need to touch a
// single row group to read schema related metadata. Notice that we are making assumptions that
// all data in a single Parquet file have the same schema, which is normally true.
children
// Try any non-"_metadata" file first...
.find(_.getPath.getName != ParquetFileWriter.PARQUET_METADATA_FILE)
// ... and fallback to "_metadata" if no such file exists (which implies the Parquet file is
// empty, thus normally the "_metadata" file is expected to be fairly small).
.orElse(children.find(_.getPath.getName == ParquetFileWriter.PARQUET_METADATA_FILE))
.map(ParquetFileReader.readFooter(conf, _))
.getOrElse(
throw new IllegalArgumentException(s"Could not find Parquet metadata at path $path"))
}

/**
Expand All @@ -403,7 +408,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
} else {
val attributes = convertToAttributes(
readMetaData(origPath, conf).getFileMetaData.getSchema)
log.warn(s"Falling back to schema conversion from Parquet types; result: $attributes")
log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
attributes
}
}
Expand Down

0 comments on commit 0f9748c

Please sign in to comment.