From 6e47c5135bf9c0d345b77cc958b94e954ae8f030 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 3 Apr 2015 00:54:27 +0800 Subject: [PATCH 1/2] Tries to skip row groups when reading footers --- .../apache/spark/sql/parquet/newParquet.scala | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 19800ad88c03..92fb6a7761da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -33,9 +33,7 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext} - import parquet.filter2.predicate.FilterApi -import parquet.format.converter.ParquetMetadataConverter import parquet.hadoop.metadata.CompressionCodecName import parquet.hadoop.util.ContextUtil import parquet.hadoop.{ParquetInputFormat, _} @@ -226,7 +224,7 @@ private[sql] case class ParquetRelation2( private var commonMetadataStatuses: Array[FileStatus] = _ // Parquet footer cache. - var footers: Map[FileStatus, Footer] = _ + var footers: Map[Path, Footer] = _ // `FileStatus` objects of all data files (Parquet part-files). var dataStatuses: Array[FileStatus] = _ @@ -292,11 +290,16 @@ private[sql] case class ParquetRelation2( commonMetadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) - footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f => - val parquetMetadata = ParquetFileReader.readFooter( - sparkContext.hadoopConfiguration, f, ParquetMetadataConverter.NO_FILTER) - f -> new Footer(f.getPath, parquetMetadata) - }.seq.toMap + footers = { + val taskSideMetaData = + sparkContext.hadoopConfiguration.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true) + + val rawFooters = + ParquetFileReader.readAllFootersInParallel( + sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData) + + rawFooters.map { footer => footer.getFile -> footer }.toMap + } partitionSpec = maybePartitionSpec.getOrElse { val partitionDirs = leaves @@ -381,7 +384,7 @@ private[sql] case class ParquetRelation2( .toSeq } - ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext) + ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext) } } @@ -426,7 +429,7 @@ private[sql] case class ParquetRelation2( } else { metadataCache.dataStatuses.toSeq } - val selectedFooters = selectedFiles.map(metadataCache.footers) + val selectedFooters = selectedFiles.map(f => metadataCache.footers(f.getPath)) // FileInputFormat cannot handle empty lists. if (selectedFiles.nonEmpty) { @@ -774,7 +777,7 @@ private[sql] object ParquetRelation2 extends Logging { val ordinalMap = metastoreSchema.zipWithIndex.map { case (field, index) => field.name.toLowerCase -> index }.toMap - val reorderedParquetSchema = mergedParquetSchema.sortBy(f => + val reorderedParquetSchema = mergedParquetSchema.sortBy(f => ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1)) StructType(metastoreSchema.zip(reorderedParquetSchema).map { From ac1c49058c63583c688526177fda5d1f594a894a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sat, 11 Apr 2015 16:02:34 +0800 Subject: [PATCH 2/2] Reads footers from summary files when possible --- .../scala/org/apache/spark/sql/parquet/newParquet.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 92fb6a7761da..b0c6bb668037 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -294,11 +294,15 @@ private[sql] case class ParquetRelation2( val taskSideMetaData = sparkContext.hadoopConfiguration.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true) - val rawFooters = + val rawFooters = if (shouldMergeSchemas) { ParquetFileReader.readAllFootersInParallel( sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData) + } else { + ParquetFileReader.readAllFootersInParallelUsingSummaryFiles( + sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData) + } - rawFooters.map { footer => footer.getFile -> footer }.toMap + rawFooters.map(footer => footer.getFile -> footer).toMap } partitionSpec = maybePartitionSpec.getOrElse {