-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-52011][SQL] Reduce HDFS NameNode RPC on vectorized Parquet reader #50765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
7d9ead8
7f5889a
a88f032
45a6e7c
f2f0da9
a5646c9
e712ca3
035742d
9d2b2dd
f043c69
9f3bd92
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,7 @@ | |
| import org.apache.parquet.hadoop.metadata.ParquetMetadata; | ||
| import org.apache.parquet.hadoop.util.ConfigurationUtil; | ||
| import org.apache.parquet.hadoop.util.HadoopInputFile; | ||
| import org.apache.parquet.io.SeekableInputStream; | ||
| import org.apache.parquet.schema.MessageType; | ||
| import org.apache.parquet.schema.Types; | ||
|
|
||
|
|
@@ -89,24 +90,27 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo | |
| @Override | ||
| public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) | ||
| throws IOException, InterruptedException { | ||
| initialize(inputSplit, taskAttemptContext, Option.empty()); | ||
| initialize(inputSplit, taskAttemptContext, Option.empty(), Option.empty(), Option.empty()); | ||
| } | ||
|
|
||
| public void initialize( | ||
| InputSplit inputSplit, | ||
| TaskAttemptContext taskAttemptContext, | ||
| Option<HadoopInputFile> inputFile, | ||
| Option<SeekableInputStream> inputStream, | ||
| Option<ParquetMetadata> fileFooter) throws IOException, InterruptedException { | ||
| Configuration configuration = taskAttemptContext.getConfiguration(); | ||
| FileSplit split = (FileSplit) inputSplit; | ||
| this.file = split.getPath(); | ||
| ParquetReadOptions options = HadoopReadOptions | ||
| .builder(configuration, file) | ||
| .withRange(split.getStart(), split.getStart() + split.getLength()) | ||
| .build(); | ||
| ParquetFileReader fileReader; | ||
| if (fileFooter.isDefined()) { | ||
| fileReader = new ParquetFileReader(configuration, file, fileFooter.get()); | ||
|
||
| if (inputFile.isDefined() && fileFooter.isDefined() && inputStream.isDefined()) { | ||
| fileReader = new ParquetFileReader( | ||
| inputFile.get(), fileFooter.get(), options, inputStream.get()); | ||
| } else { | ||
| ParquetReadOptions options = HadoopReadOptions | ||
| .builder(configuration, file) | ||
| .withRange(split.getStart(), split.getStart() + split.getLength()) | ||
| .build(); | ||
| fileReader = new ParquetFileReader( | ||
| HadoopInputFile.fromPath(file, configuration), options); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources | |
| import java.io.{Closeable, FileNotFoundException} | ||
| import java.net.URI | ||
|
|
||
| import org.apache.hadoop.fs.Path | ||
| import org.apache.hadoop.fs.{FileStatus, Path} | ||
| import org.apache.hadoop.hdfs.BlockMissingException | ||
| import org.apache.hadoop.security.AccessControlException | ||
|
|
||
|
|
@@ -50,6 +50,7 @@ import org.apache.spark.util.NextIterator | |
| * @param filePath URI of the file to read | ||
| * @param start the beginning offset (in bytes) of the block. | ||
| * @param length number of bytes to read. | ||
| * @param fileStatus The FileStatus instance of the file to read. | ||
| * @param modificationTime The modification time of the input file, in milliseconds. | ||
| * @param fileSize The length of the input file (not the block), in bytes. | ||
| * @param otherConstantMetadataColumnValues The values of any additional constant metadata columns. | ||
|
|
@@ -59,6 +60,7 @@ case class PartitionedFile( | |
| filePath: SparkPath, | ||
| start: Long, | ||
| length: Long, | ||
| fileStatus: FileStatus, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly, due to the addition of
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In addition, since
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The fileStatus should occupy a little bit more memory, but I haven't received OOM issues during the rollout of this change to the online cluster.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan Are there also risks of breaking internal APIs with modifications similar to those made here and in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the cost of serializing file status?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan I think
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to have a custom serde for it and only send the path string? This reminds me of
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems not feasible, because
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cloud-fan the change basically moves the RPC cost from executor => storage service, to driver => executors, in my env (HDFS with RBF), the latter is much cheaper than the former. I don't have cloud env, so I can't give numbers for object storage services like S3
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, then this may cause regression for short queries?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm not sure how much difference this will make it terms of driver memory usage. Is it easy to make the It seems in Parquet Java the file status is only used in one case: https://github.com/apache/parquet-java/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java#L109-L132 Mostly we just need file path and length. But yea this one use case seems critical to avoid duplicated NN call to get the file status again.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @sunchao thanks for your suggestion, after an offline discussion with @cloud-fan, I understand his concerns about the overhead of
so, I'm going to split this PR into two parts
|
||
| @transient locations: Array[String] = Array.empty, | ||
| modificationTime: Long = 0L, | ||
| fileSize: Long = 0L, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,10 +26,13 @@ import org.apache.hadoop.fs.{FileStatus, Path} | |
| import org.apache.hadoop.mapred.FileSplit | ||
| import org.apache.hadoop.mapreduce._ | ||
| import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl | ||
| import org.apache.parquet.HadoopReadOptions | ||
| import org.apache.parquet.filter2.compat.FilterCompat | ||
| import org.apache.parquet.filter2.predicate.FilterApi | ||
| import org.apache.parquet.format.converter.ParquetMetadataConverter | ||
| import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS | ||
| import org.apache.parquet.hadoop._ | ||
| import org.apache.parquet.hadoop.util.HadoopInputFile | ||
|
|
||
| import org.apache.spark.TaskContext | ||
| import org.apache.spark.internal.Logging | ||
|
|
@@ -207,15 +210,31 @@ class ParquetFileFormat | |
|
|
||
| val sharedConf = broadcastedHadoopConf.value.value | ||
|
|
||
| val fileFooter = if (enableVectorizedReader) { | ||
| // When there are vectorized reads, we can avoid reading the footer twice by reading | ||
| // all row groups in advance and filter row groups according to filters that require | ||
| // push down (no need to read the footer metadata again). | ||
| ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS) | ||
| // When there are vectorized reads, we can avoid | ||
| // 1. opening the file twice by transfering the SeekableInputStream | ||
| // 2. reading the footer twice by reading all row groups in advance and filter row groups | ||
| // according to filters that require push down | ||
| val metadataFilter = if (enableVectorizedReader) { | ||
| HadoopReadOptions.builder(sharedConf, filePath) | ||
| .withRange(file.start, file.start + file.length) | ||
| .build.getMetadataFilter | ||
| } else { | ||
| ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS) | ||
| ParquetMetadataConverter.SKIP_ROW_GROUPS | ||
| } | ||
|
|
||
| val readOptions = HadoopReadOptions.builder(sharedConf, filePath) | ||
| .withMetadataFilter(metadataFilter).build | ||
|
|
||
| val inputFile = HadoopInputFile.fromStatus(file.fileStatus, sharedConf) | ||
| val inputStream = inputFile.newStream() | ||
|
||
| val fileReader = ParquetFileReader.open(inputFile, readOptions, inputStream) | ||
| val fileFooter = fileReader.getFooter | ||
| if (enableVectorizedReader) { | ||
| // Keep the file input stream open so it can be reused later | ||
| fileReader.detachFileInputStream() | ||
| } | ||
| fileReader.close() | ||
pan3793 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| val footerFileMetaData = fileFooter.getFileMetaData | ||
| val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( | ||
| footerFileMetaData.getKeyValueMetaData.get, | ||
|
|
@@ -289,7 +308,8 @@ class ParquetFileFormat | |
| // Instead, we use FileScanRDD's task completion listener to close this iterator. | ||
| val iter = new RecordReaderIterator(vectorizedReader) | ||
| try { | ||
| vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter)) | ||
| vectorizedReader.initialize( | ||
| split, hadoopAttemptContext, Some(inputFile), Some(inputStream), Some(fileFooter)) | ||
|
||
| logDebug(s"Appending $partitionSchema ${file.partitionValues}") | ||
| vectorizedReader.initBatch(partitionSchema, file.partitionValues) | ||
| if (returningBatch) { | ||
|
|
@@ -447,9 +467,9 @@ object ParquetFileFormat extends Logging { | |
| // Skips row group information since we only need the schema. | ||
| // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, | ||
| // when it can't read the footer. | ||
| Some(new Footer(currentFile.getPath(), | ||
| Some(new Footer(currentFile.getPath, | ||
| ParquetFooterReader.readFooter( | ||
| conf, currentFile, SKIP_ROW_GROUPS))) | ||
| HadoopInputFile.fromStatus(currentFile, conf), SKIP_ROW_GROUPS))) | ||
| } catch { case e: RuntimeException => | ||
| if (ignoreCorruptFiles) { | ||
| logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, currentFile)}", e) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,10 +21,14 @@ import java.time.ZoneId | |
| import org.apache.hadoop.mapred.FileSplit | ||
| import org.apache.hadoop.mapreduce._ | ||
| import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl | ||
| import org.apache.parquet.HadoopReadOptions | ||
| import org.apache.parquet.filter2.compat.FilterCompat | ||
| import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} | ||
| import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} | ||
| import org.apache.parquet.format.converter.ParquetMetadataConverter | ||
| import org.apache.parquet.hadoop._ | ||
| import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata} | ||
| import org.apache.parquet.hadoop.util.HadoopInputFile | ||
| import org.apache.parquet.io.SeekableInputStream | ||
|
|
||
| import org.apache.spark.TaskContext | ||
| import org.apache.spark.broadcast.Broadcast | ||
|
|
@@ -86,17 +90,43 @@ case class ParquetPartitionReaderFactory( | |
|
|
||
| private val parquetReaderCallback = new ParquetReaderCallback() | ||
|
|
||
| private def getFooter(file: PartitionedFile): ParquetMetadata = { | ||
| val conf = broadcastedConf.value.value | ||
| if (aggregation.isDefined || enableVectorizedReader) { | ||
| // There are two purposes for reading footer with row groups: | ||
| // 1. When there are aggregates to push down, we get max/min/count from footer statistics. | ||
| // 2. When there are vectorized reads, we can avoid reading the footer twice by reading | ||
| // all row groups in advance and filter row groups according to filters that require | ||
| // push down (no need to read the footer metadata again). | ||
| ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.WITH_ROW_GROUPS) | ||
| private def getFooter(file: PartitionedFile): | ||
pan3793 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| (Option[HadoopInputFile], Option[SeekableInputStream], ParquetMetadata) = { | ||
| val hadoopConf = broadcastedConf.value.value | ||
| if (aggregation.isDefined) { | ||
| // When there are aggregates to push down, we get max/min/count from footer statistics. | ||
| val footer = ParquetFooterReader.readFooter( | ||
| hadoopConf, file, ParquetFooterReader.WITH_ROW_GROUPS) | ||
| (None, None, footer) | ||
| } else { | ||
| ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.SKIP_ROW_GROUPS) | ||
| // When there are vectorized reads, we can avoid | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we extract this into a new util method in
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Addressed, please check the updated BTW, I don't see special reason to write this file in Java, as I'm going to use Scala data structures (
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, logic is controlled by |
||
| // 1. opening the file twice by transfering the SeekableInputStream | ||
| // 2. reading the footer twice by reading all row groups in advance and filter row groups | ||
| // according to filters that require push down | ||
| val metadataFilter = if (enableVectorizedReader) { | ||
| HadoopReadOptions.builder(hadoopConf, file.toPath) | ||
| .withRange(file.start, file.start + file.length) | ||
| .build.getMetadataFilter | ||
| } else { | ||
| ParquetMetadataConverter.SKIP_ROW_GROUPS | ||
| } | ||
| val readOptions = HadoopReadOptions.builder(hadoopConf, file.toPath) | ||
| .withMetadataFilter(metadataFilter).build | ||
|
|
||
| val inputFile = HadoopInputFile.fromStatus(file.fileStatus, hadoopConf) | ||
| val inputStream = inputFile.newStream() | ||
| val fileReader = ParquetFileReader.open(inputFile, readOptions, inputStream) | ||
| val fileFooter = fileReader.getFooter | ||
| if (enableVectorizedReader) { | ||
| // Keep the file input stream open so it can be reused later | ||
| fileReader.detachFileInputStream() | ||
| } | ||
| fileReader.close() | ||
| if (enableVectorizedReader) { | ||
| (Some(inputFile), Some(inputStream), fileFooter) | ||
| } else { | ||
| (None, None, fileFooter) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -130,7 +160,7 @@ case class ParquetPartitionReaderFactory( | |
| new PartitionReader[InternalRow] { | ||
| private var hasNext = true | ||
| private lazy val row: InternalRow = { | ||
| val footer = getFooter(file) | ||
| val (_, _, footer) = getFooter(file) | ||
|
|
||
| if (footer != null && footer.getBlocks.size > 0) { | ||
| ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, | ||
|
|
@@ -175,7 +205,7 @@ case class ParquetPartitionReaderFactory( | |
| new PartitionReader[ColumnarBatch] { | ||
| private var hasNext = true | ||
| private val batch: ColumnarBatch = { | ||
| val footer = getFooter(file) | ||
| val (_, _, footer) = getFooter(file) | ||
| if (footer != null && footer.getBlocks.size > 0) { | ||
| val row = ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, | ||
| dataSchema, partitionSchema, aggregation.get, readDataSchema, file.partitionValues, | ||
|
|
@@ -213,7 +243,7 @@ case class ParquetPartitionReaderFactory( | |
|
|
||
| val filePath = file.toPath | ||
| val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) | ||
| val fileFooter = getFooter(file) | ||
| val (inputFile, inputStream, fileFooter) = getFooter(file) | ||
| val footerFileMetaData = fileFooter.getFileMetaData | ||
| val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData) | ||
| // Try to push down filters when filter push-down is enabled. | ||
|
|
@@ -274,7 +304,8 @@ case class ParquetPartitionReaderFactory( | |
| ) { reader => | ||
| reader match { | ||
| case vectorizedReader: VectorizedParquetRecordReader => | ||
| vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter)) | ||
| vectorizedReader.initialize( | ||
| split, hadoopAttemptContext, inputFile, inputStream, Some(fileFooter)) | ||
| case _ => | ||
| reader.initialize(split, hadoopAttemptContext) | ||
| } | ||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the
SKIP_ROW_GROUPSandWITH_ROW_GROUPSnow I think, as they are no longer used.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SKIP_ROW_GROUPSis unused now, butWITH_ROW_GROUPSis used byParquetPartitionReaderFactory.openFileAndReadFooteragg push down case, I removed them and replaced theWITH_ROW_GROUPSwith the literalfalseinstead.