Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}

import parquet.filter2.predicate.FilterApi
import parquet.format.converter.ParquetMetadataConverter
import parquet.hadoop.metadata.CompressionCodecName
import parquet.hadoop.util.ContextUtil
import parquet.hadoop.{ParquetInputFormat, _}
Expand Down Expand Up @@ -226,7 +224,7 @@ private[sql] case class ParquetRelation2(
private var commonMetadataStatuses: Array[FileStatus] = _

// Parquet footer cache.
var footers: Map[FileStatus, Footer] = _
var footers: Map[Path, Footer] = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the advantage to making this a Map[FileStatus, Footer] is that if the file changes the cache is automatically invalidated. Though perhaps this does not matter if we are also caching those?

Also, I think that we need to bound the size of this, perhaps in some global way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileStatus objects are also cached, so this should be OK. Bounding the size can be a good idea.


// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
Expand Down Expand Up @@ -292,11 +290,20 @@ private[sql] case class ParquetRelation2(
commonMetadataStatuses =
leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)

footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f =>
val parquetMetadata = ParquetFileReader.readFooter(
sparkContext.hadoopConfiguration, f, ParquetMetadataConverter.NO_FILTER)
f -> new Footer(f.getPath, parquetMetadata)
}.seq.toMap
footers = {
val taskSideMetaData =
sparkContext.hadoopConfiguration.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)

val rawFooters = if (shouldMergeSchemas) {
ParquetFileReader.readAllFootersInParallel(
sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData)
} else {
ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(
sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData)
}

rawFooters.map(footer => footer.getFile -> footer).toMap
}

partitionSpec = maybePartitionSpec.getOrElse {
val partitionDirs = leaves
Expand Down Expand Up @@ -381,7 +388,7 @@ private[sql] case class ParquetRelation2(
.toSeq
}

ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext)
ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext)
}
}

Expand Down Expand Up @@ -426,7 +433,7 @@ private[sql] case class ParquetRelation2(
} else {
metadataCache.dataStatuses.toSeq
}
val selectedFooters = selectedFiles.map(metadataCache.footers)
val selectedFooters = selectedFiles.map(f => metadataCache.footers(f.getPath))

// FileInputFormat cannot handle empty lists.
if (selectedFiles.nonEmpty) {
Expand Down Expand Up @@ -774,7 +781,7 @@ private[sql] object ParquetRelation2 extends Logging {
val ordinalMap = metastoreSchema.zipWithIndex.map {
case (field, index) => field.name.toLowerCase -> index
}.toMap
val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
val reorderedParquetSchema = mergedParquetSchema.sortBy(f =>
ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1))

StructType(metastoreSchema.zip(reorderedParquetSchema).map {
Expand Down