diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java deleted file mode 100644 index 438fffa11784..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.HadoopReadOptions; -import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.format.converter.ParquetMetadataConverter; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.hadoop.util.HadoopInputFile; - -import org.apache.spark.sql.execution.datasources.PartitionedFile; - -/** - * `ParquetFooterReader` is a util class which encapsulates the helper - * methods of reading parquet file footer - */ -public class ParquetFooterReader { - - public static final boolean SKIP_ROW_GROUPS = true; - public static final boolean WITH_ROW_GROUPS = false; - - /** - * Reads footer for the input Parquet file 'split'. If 'skipRowGroup' is true, - * this will skip reading the Parquet row group metadata. - * - * @param file a part (i.e. "block") of a single file that should be read - * @param configuration hadoop configuration of file - * @param skipRowGroup If true, skip reading row groups; - * if false, read row groups according to the file split range - */ - public static ParquetMetadata readFooter( - Configuration configuration, - PartitionedFile file, - boolean skipRowGroup) throws IOException { - long fileStart = file.start(); - ParquetMetadataConverter.MetadataFilter filter; - if (skipRowGroup) { - filter = ParquetMetadataConverter.SKIP_ROW_GROUPS; - } else { - filter = HadoopReadOptions.builder(configuration, file.toPath()) - .withRange(fileStart, fileStart + file.length()) - .build() - .getMetadataFilter(); - } - return readFooter(configuration, file.toPath(), filter); - } - - public static ParquetMetadata readFooter(Configuration configuration, - Path file, ParquetMetadataConverter.MetadataFilter filter) throws IOException { - return readFooter(HadoopInputFile.fromPath(file, configuration), filter); - } - - public static ParquetMetadata readFooter(Configuration configuration, - FileStatus fileStatus, ParquetMetadataConverter.MetadataFilter filter) throws IOException { - return readFooter(HadoopInputFile.fromStatus(fileStatus, configuration), filter); - } - - private static ParquetMetadata readFooter(HadoopInputFile inputFile, - ParquetMetadataConverter.MetadataFilter filter) throws IOException { - ParquetReadOptions readOptions = - HadoopReadOptions.builder(inputFile.getConfiguration(), inputFile.getPath()) - .withMetadataFilter(filter).build(); - // Use try-with-resources to ensure fd is closed. - try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, readOptions)) { - return fileReader.getFooter(); - } - } -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index d3716ef18447..038112086e47 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -49,6 +49,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; @@ -89,24 +90,27 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader inputFile, + Option inputStream, Option fileFooter) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); FileSplit split = (FileSplit) inputSplit; this.file = split.getPath(); + ParquetReadOptions options = HadoopReadOptions + .builder(configuration, file) + .withRange(split.getStart(), split.getStart() + split.getLength()) + .build(); ParquetFileReader fileReader; - if (fileFooter.isDefined()) { - fileReader = new ParquetFileReader(configuration, file, fileFooter.get()); + if (inputFile.isDefined() && fileFooter.isDefined() && inputStream.isDefined()) { + fileReader = new ParquetFileReader( + inputFile.get(), fileFooter.get(), options, inputStream.get()); } else { - ParquetReadOptions options = HadoopReadOptions - .builder(configuration, file) - .withRange(split.getStart(), split.getStart() + split.getLength()) - .build(); fileReader = new ParquetFileReader( HadoopInputFile.fromPath(file, configuration), options); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 9010c6e30be0..b15f79df527e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Set; -import org.apache.spark.SparkUnsupportedOperationException; -import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; import scala.Option; import scala.jdk.javaapi.CollectionConverters; @@ -35,11 +33,15 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; import org.apache.spark.sql.execution.vectorized.ConstantColumnVector; @@ -190,9 +192,11 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext, + Option inputFile, + Option inputStream, Option fileFooter) throws IOException, InterruptedException, UnsupportedOperationException { - super.initialize(inputSplit, taskAttemptContext, fileFooter); + super.initialize(inputSplit, taskAttemptContext, inputFile, inputStream, fileFooter); initializeInternal(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 2488b6aa5115..a434065af88c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -640,7 +640,7 @@ trait FileSourceScanLike extends DataSourceScanExec with SessionStateHelper { override def toPartitionArray: Array[PartitionedFile] = { partitionDirectories.flatMap { p => p.files.map { f => - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values, 0, f.getLen) + PartitionedFileUtil.getPartitionedFile(f, p.values, 0, f.getLen) } } } @@ -876,7 +876,6 @@ case class FileSourceScanExec( relation.sparkSession, relation.options, filePath) PartitionedFileUtil.splitFiles( file = file, - filePath = filePath, isSplitable = isSplitable, maxSplitBytes = maxSplitBytes, partitionValues = partitionVals diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala index 1411eae1aace..9ea23efbaec4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala @@ -17,16 +17,14 @@ package org.apache.spark.sql.execution -import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} +import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus} -import org.apache.spark.paths.SparkPath import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources._ object PartitionedFileUtil { def splitFiles( file: FileStatusWithMetadata, - filePath: Path, isSplitable: Boolean, maxSplitBytes: Long, partitionValues: InternalRow): Seq[PartitionedFile] = { @@ -34,22 +32,20 @@ object PartitionedFileUtil { (0L until file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining - getPartitionedFile(file, filePath, partitionValues, offset, size) + getPartitionedFile(file, partitionValues, offset, size) } } else { - Seq(getPartitionedFile(file, filePath, partitionValues, 0, file.getLen)) + Seq(getPartitionedFile(file, partitionValues, 0, file.getLen)) } } def getPartitionedFile( file: FileStatusWithMetadata, - filePath: Path, partitionValues: InternalRow, start: Long, length: Long): PartitionedFile = { val hosts = getBlockHosts(getBlockLocations(file.fileStatus), start, length) - PartitionedFile(partitionValues, SparkPath.fromPath(filePath), start, length, hosts, - file.getModificationTime, file.getLen, file.metadata) + PartitionedFile(partitionValues, start, length, file.fileStatus, hosts, file.metadata) } private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 8a254b464da7..1f13ca0feac9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -22,7 +22,6 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.mapreduce.Job -import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -315,9 +314,7 @@ object FileFormat { def createMetadataInternalRow( partitionValues: InternalRow, fieldNames: Seq[String], - filePath: SparkPath, - fileSize: Long, - fileModificationTime: Long): InternalRow = { + fileStatus: FileStatus): InternalRow = { // When scanning files directly from the filesystem, we only support file-constant metadata // fields whose values can be derived from a file status. In particular, we don't have accurate // file split information yet, nor do we have a way to provide custom metadata column values. @@ -327,12 +324,10 @@ object FileFormat { assert(fieldNames.forall(validFieldNames.contains)) val pf = PartitionedFile( partitionValues = partitionValues, - filePath = filePath, start = 0L, - length = fileSize, + length = fileStatus.getLen, + fileStatus = fileStatus, locations = Array.empty, - modificationTime = fileModificationTime, - fileSize = fileSize, otherConstantMetadataColumnValues = Map.empty) updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames, pf, extractors) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala index 0291a5fd28a7..e786153d4469 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala @@ -21,7 +21,6 @@ import scala.collection.mutable import org.apache.hadoop.fs._ -import org.apache.spark.paths.SparkPath import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.StructType @@ -84,8 +83,7 @@ class FilePruningRunner(filters: Seq[Expression]) { // use option.forall, so if there is no filter no metadata struct, return true boundedFilterMetadataStructOpt.forall { boundedFilter => val row = - FileFormat.createMetadataInternalRow(partitionValues, requiredMetadataColumnNames.toSeq, - SparkPath.fromFileStatus(f), f.getLen, f.getModificationTime) + FileFormat.createMetadataInternalRow(partitionValues, requiredMetadataColumnNames.toSeq, f) boundedFilter.eval(row) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 5dc13ccee9ce..621004e28f8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import java.io.{Closeable, FileNotFoundException} import java.net.URI -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hdfs.BlockMissingException import org.apache.hadoop.security.AccessControlException @@ -47,26 +47,25 @@ import org.apache.spark.util.NextIterator * that need to be prepended to each row. * * @param partitionValues value of partition columns to be prepended to each row. - * @param filePath URI of the file to read * @param start the beginning offset (in bytes) of the block. * @param length number of bytes to read. - * @param modificationTime The modification time of the input file, in milliseconds. - * @param fileSize The length of the input file (not the block), in bytes. + * @param fileStatus The FileStatus instance of the file to read. * @param otherConstantMetadataColumnValues The values of any additional constant metadata columns. */ case class PartitionedFile( partitionValues: InternalRow, - filePath: SparkPath, start: Long, length: Long, + fileStatus: FileStatus, @transient locations: Array[String] = Array.empty, - modificationTime: Long = 0L, - fileSize: Long = 0L, otherConstantMetadataColumnValues: Map[String, Any] = Map.empty) { + @transient lazy val filePath: SparkPath = SparkPath.fromFileStatus(fileStatus) def pathUri: URI = filePath.toUri def toPath: Path = filePath.toPath def urlEncodedPath: String = filePath.urlEncoded + def modificationTime: Long = fileStatus.getModificationTime + def fileSize: Long = fileStatus.getLen override def toString: String = { s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index be6e5d188667..134faf5a5ff9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -30,6 +30,7 @@ import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop._ +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -47,7 +48,7 @@ import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapC import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils} class ParquetFileFormat extends FileFormat @@ -202,145 +203,156 @@ class ParquetFileFormat (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) - val filePath = file.toPath - val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - + val split = new FileSplit(file.toPath, file.start, file.length, Array.empty[String]) val sharedConf = broadcastedHadoopConf.value.value - val fileFooter = if (enableVectorizedReader) { - // When there are vectorized reads, we can avoid reading the footer twice by reading - // all row groups in advance and filter row groups according to filters that require - // push down (no need to read the footer metadata again). - ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS) - } else { - ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS) - } - - val footerFileMetaData = fileFooter.getFileMetaData - val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - datetimeRebaseModeInRead) - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) - - // Try to push down filters when filter push-down is enabled. - val pushed = if (enableParquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringPredicate, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(parquetFilters.createFilter(_)) - .reduceOption(FilterApi.and) - } else { - None - } - - // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' - // *only* if the file was created by something other than "parquet-mr", so check the actual - // writer here for this file. We have to do this per-file, as each file in the table may - // have different writers. - // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. - def isCreatedByParquetMr: Boolean = - footerFileMetaData.getCreatedBy().startsWith("parquet-mr") - - val convertTz = - if (timestampConversion && !isCreatedByParquetMr) { - Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + // When there are vectorized reads, we can avoid + // 1. opening the file twice by transfering the SeekableInputStream + // 2. reading the footer twice by reading all row groups in advance and filter row groups + // according to filters that require push down + val (inputFileOpt, inputStreamOpt, fileFooter) = + ParquetFooterReader.openFileAndReadFooter(sharedConf, file, enableVectorizedReader) + + // Before transferring the ownership of inputStream to the vectorizedReader, + // we must take responsibility to close the inputStream if something goes wrong + // to avoid resource leak. + var shouldCloseInputStream = inputStreamOpt.isDefined + try { + val footerFileMetaData = fileFooter.getFileMetaData + val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) + val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringPredicate, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates + // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why + // a `flatMap` is used here. + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) } else { None } + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 + // timestamps *only* if the file was created by something other than "parquet-mr", + // so check the actual writer here for this file. We have to do this per-file, + // as each file in the table may have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy.startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = - new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) - } - val taskContext = Option(TaskContext.get()) - if (enableVectorizedReader) { - val vectorizedReader = new VectorizedParquetRecordReader( - convertTz.orNull, - datetimeRebaseSpec.mode.toString, - datetimeRebaseSpec.timeZone, - int96RebaseSpec.mode.toString, - int96RebaseSpec.timeZone, - enableOffHeapColumnVector && taskContext.isDefined, - capacity) - // SPARK-37089: We cannot register a task completion listener to close this iterator here - // because downstream exec nodes have already registered their listeners. Since listeners - // are executed in reverse order of registration, a listener registered here would close the - // iterator while downstream exec nodes are still running. When off-heap column vectors are - // enabled, this can cause a use-after-free bug leading to a segfault. - // - // Instead, we use FileScanRDD's task completion listener to close this iterator. - val iter = new RecordReaderIterator(vectorizedReader) - try { - vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter)) - logDebug(s"Appending $partitionSchema ${file.partitionValues}") - vectorizedReader.initBatch(partitionSchema, file.partitionValues) - if (returningBatch) { - vectorizedReader.enableReturningBatches() - } + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - iter.asInstanceOf[Iterator[InternalRow]] - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } - } else { - logDebug(s"Falling back to parquet-mr") - // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( - convertTz, - enableVectorizedReader = false, - datetimeRebaseSpec, - int96RebaseSpec) - val reader = if (pushed.isDefined && enableRecordFilter) { - val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[InternalRow](readSupport, parquetFilter) + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseSpec.mode.toString, + datetimeRebaseSpec.timeZone, + int96RebaseSpec.mode.toString, + int96RebaseSpec.timeZone, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + // SPARK-37089: We cannot register a task completion listener to close this iterator + // here because downstream exec nodes have already registered their listeners. Since + // listeners are executed in reverse order of registration, a listener registered + // here would close the iterator while downstream exec nodes are still running. When + // off-heap column vectors are enabled, this can cause a use-after-free bug leading to + // a segfault. + // + // Instead, we use FileScanRDD's task completion listener to close this iterator. + val iter = new RecordReaderIterator(vectorizedReader) + try { + // We don't need to take care the close of inputStream because this transfers + // the ownership of inputStream to the vectorizedReader + vectorizedReader.initialize( + split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) + shouldCloseInputStream = false + logDebug(s"Appending $partitionSchema ${file.partitionValues}") + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator + // to avoid leaking resources. + iter.close() + throw e + } } else { - new ParquetRecordReader[InternalRow](readSupport) - } - val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, - requiredSchema) - val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) - try { - readerWithRowIndexes.initialize(split, hadoopAttemptContext) - - val fullSchema = toAttributes(requiredSchema) ++ toAttributes(partitionSchema) - val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - if (partitionSchema.length == 0) { - // There is no partition columns - iter.map(unsafeProjection) + logDebug(s"Falling back to parquet-mr") + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseSpec, + int96RebaseSpec) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) } else { - val joinedRow = new JoinedRow() - iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + new ParquetRecordReader[InternalRow](readSupport) } - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, + requiredSchema) + val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) + try { + readerWithRowIndexes.initialize(split, hadoopAttemptContext) + + val fullSchema = toAttributes(requiredSchema) ++ toAttributes(partitionSchema) + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + } + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator + // to avoid leaking resources. + iter.close() + throw e + } + } + } finally { + if (shouldCloseInputStream) { + inputStreamOpt.foreach(Utils.closeQuietly) } } } @@ -447,9 +459,9 @@ object ParquetFileFormat extends Logging { // Skips row group information since we only need the schema. // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, // when it can't read the footer. - Some(new Footer(currentFile.getPath(), + Some(new Footer(currentFile.getPath, ParquetFooterReader.readFooter( - conf, currentFile, SKIP_ROW_GROUPS))) + HadoopInputFile.fromStatus(currentFile, conf), SKIP_ROW_GROUPS))) } catch { case e: RuntimeException => if (ignoreCorruptFiles) { logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, currentFile)}", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.scala new file mode 100644 index 000000000000..0b18b69d7b14 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.HadoopReadOptions +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.metadata.ParquetMetadata +import org.apache.parquet.hadoop.util.HadoopInputFile +import org.apache.parquet.io.SeekableInputStream + +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.util.Utils + +object ParquetFooterReader { + + /** + * Build a filter for reading footer of the input Parquet file 'split'. + * If 'skipRowGroup' is true, this will skip reading the Parquet row group metadata. + * + * @param hadoopConf hadoop configuration of file + * @param file a part (i.e. "block") of a single file that should be read + * @param skipRowGroup If true, skip reading row groups; + * if false, read row groups according to the file split range + */ + def buildFilter( + hadoopConf: Configuration, + file: PartitionedFile, + skipRowGroup: Boolean): ParquetMetadataConverter.MetadataFilter = { + if (skipRowGroup) { + ParquetMetadataConverter.SKIP_ROW_GROUPS + } else { + HadoopReadOptions.builder(hadoopConf, file.toPath) + .withRange(file.start, file.start + file.length) + .build() + .getMetadataFilter + } + } + + def readFooter( + inputFile: HadoopInputFile, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + val readOptions = HadoopReadOptions.builder(inputFile.getConfiguration, inputFile.getPath) + .withMetadataFilter(filter).build() + Utils.tryWithResource(ParquetFileReader.open(inputFile, readOptions)) { fileReader => + fileReader.getFooter + } + } + + /** + * Decoding Parquet files generally involves two steps: + * 1. read and resolve the metadata (footer), + * 2. read and decode the row groups/column chunks. + * + * It's possible to avoid opening the file twice by resuing the SeekableInputStream. + * When detachFileInputStream is true, the caller takes responsibility to close the + * SeekableInputStream. Currently, this is only supported by parquet vectorized reader. + * + * @param hadoopConf hadoop configuration of file + * @param file a part (i.e. "block") of a single file that should be read + * @param detachFileInputStream when true, keep the SeekableInputStream of file opened + * @return if detachFileInputStream is true, return + * (Some(HadoopInputFile), Soem(SeekableInputStream), ParquetMetadata), + * otherwise, return (None, None, ParquetMetadata). + */ + def openFileAndReadFooter( + hadoopConf: Configuration, + file: PartitionedFile, + detachFileInputStream: Boolean): + (Option[HadoopInputFile], Option[SeekableInputStream], ParquetMetadata) = { + val readOptions = HadoopReadOptions.builder(hadoopConf, file.toPath) + .withMetadataFilter(buildFilter(hadoopConf, file, !detachFileInputStream)) + .build() + val inputFile = HadoopInputFile.fromStatus(file.fileStatus, hadoopConf) + val inputStream = inputFile.newStream() + Utils.tryWithResource( + ParquetFileReader.open(inputFile, readOptions, inputStream)) { fileReader => + val footer = fileReader.getFooter + if (detachFileInputStream) { + fileReader.detachFileInputStream() + (Some(inputFile), Some(inputStream), footer) + } else { + (None, None, footer) + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 5348f9ab6df6..6c5595f75b0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -159,11 +159,9 @@ trait FileScan extends Scan partition.values } partition.files.flatMap { file => - val filePath = file.getPath PartitionedFileUtil.splitFiles( file = file, - filePath = filePath, - isSplitable = isSplitable(filePath), + isSplitable = isSplitable(file.getPath), maxSplitBytes = maxSplitBytes, partitionValues = partitionValues ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 70ae8068a03a..d6f1eb91c1d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -23,8 +23,10 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} -import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} +import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.util.HadoopInputFile +import org.apache.parquet.io.SeekableInputStream import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast @@ -86,17 +88,21 @@ case class ParquetPartitionReaderFactory( private val parquetReaderCallback = new ParquetReaderCallback() - private def getFooter(file: PartitionedFile): ParquetMetadata = { - val conf = broadcastedConf.value.value - if (aggregation.isDefined || enableVectorizedReader) { - // There are two purposes for reading footer with row groups: - // 1. When there are aggregates to push down, we get max/min/count from footer statistics. - // 2. When there are vectorized reads, we can avoid reading the footer twice by reading - // all row groups in advance and filter row groups according to filters that require - // push down (no need to read the footer metadata again). - ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.WITH_ROW_GROUPS) + private def openFileAndReadFooter(file: PartitionedFile): + (Option[HadoopInputFile], Option[SeekableInputStream], ParquetMetadata) = { + val hadoopConf = broadcastedConf.value.value + if (aggregation.isDefined) { + // When there are aggregates to push down, we get max/min/count from footer statistics. + val footer = ParquetFooterReader.readFooter( + HadoopInputFile.fromStatus(file.fileStatus, hadoopConf), + ParquetFooterReader.buildFilter(hadoopConf, file, skipRowGroup = false)) + (None, None, footer) } else { - ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.SKIP_ROW_GROUPS) + // When there are vectorized reads, we can avoid + // 1. opening the file twice by transfering the SeekableInputStream + // 2. reading the footer twice by reading all row groups in advance and filter row groups + // according to filters that require push down + ParquetFooterReader.openFileAndReadFooter(hadoopConf, file, enableVectorizedReader) } } @@ -130,7 +136,7 @@ case class ParquetPartitionReaderFactory( new PartitionReader[InternalRow] { private var hasNext = true private lazy val row: InternalRow = { - val footer = getFooter(file) + val (_, _, footer) = openFileAndReadFooter(file) if (footer != null && footer.getBlocks.size > 0) { ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, @@ -175,7 +181,7 @@ case class ParquetPartitionReaderFactory( new PartitionReader[ColumnarBatch] { private var hasNext = true private val batch: ColumnarBatch = { - val footer = getFooter(file) + val (_, _, footer) = openFileAndReadFooter(file) if (footer != null && footer.getBlocks.size > 0) { val row = ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, dataSchema, partitionSchema, aggregation.get, readDataSchema, file.partitionValues, @@ -211,74 +217,88 @@ case class ParquetPartitionReaderFactory( RebaseSpec) => RecordReader[Void, T]): RecordReader[Void, T] = { val conf = broadcastedConf.value.value - val filePath = file.toPath - val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - val fileFooter = getFooter(file) - val footerFileMetaData = fileFooter.getFileMetaData - val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData) - // Try to push down filters when filter push-down is enabled. - val pushed = if (enableParquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringPredicate, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(parquetFilters.createFilter) - .reduceOption(FilterApi.and) - } else { - None - } - // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' - // *only* if the file was created by something other than "parquet-mr", so check the actual - // writer here for this file. We have to do this per-file, as each file in the table may - // have different writers. - // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. - def isCreatedByParquetMr: Boolean = - footerFileMetaData.getCreatedBy().startsWith("parquet-mr") - - val convertTz = - if (timestampConversion && !isCreatedByParquetMr) { - Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + val split = new FileSplit(file.toPath, file.start, file.length, Array.empty[String]) + val (inputFileOpt, inputStreamOpt, fileFooter) = openFileAndReadFooter(file) + // Before transferring the ownership of inputStream to the vectorizedReader, + // we must take responsibility to close the inputStream if something goes wrong + // to avoid resource leak. + var shouldCloseInputStream = inputStreamOpt.isDefined + try { + val footerFileMetaData = fileFooter.getFileMetaData + val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData) + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringPredicate, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) } else { None } - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) - } - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - int96RebaseModeInRead) - Utils.createResourceUninterruptiblyIfInTaskThread { - Utils.tryInitializeResource( - buildReaderFunc( - file.partitionValues, - pushed, - convertTz, - datetimeRebaseSpec, - int96RebaseSpec) - ) { reader => - reader match { - case vectorizedReader: VectorizedParquetRecordReader => - vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter)) - case _ => - reader.initialize(split, hadoopAttemptContext) + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + int96RebaseModeInRead) + Utils.createResourceUninterruptiblyIfInTaskThread { + Utils.tryInitializeResource( + buildReaderFunc( + file.partitionValues, + pushed, + convertTz, + datetimeRebaseSpec, + int96RebaseSpec) + ) { reader => + reader match { + case vectorizedReader: VectorizedParquetRecordReader => + // We don't need to take care the close of inputStream because this transfers + // the ownership of inputStream to the vectorizedReader + vectorizedReader.initialize( + split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) + shouldCloseInputStream = false + case _ => + reader.initialize(split, hadoopAttemptContext) + } + reader } - reader + } + } finally { + if (shouldCloseInputStream) { + inputStreamOpt.foreach(Utils.closeQuietly) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index afeca756208e..df113790e0af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -286,10 +286,14 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession { test("Locality support for FileScanRDD") { val partition = FilePartition(0, Array( - PartitionedFile(InternalRow.empty, sp("fakePath0"), 0, 10, Array("host0", "host1")), - PartitionedFile(InternalRow.empty, sp("fakePath0"), 10, 20, Array("host1", "host2")), - PartitionedFile(InternalRow.empty, sp("fakePath1"), 0, 5, Array("host3")), - PartitionedFile(InternalRow.empty, sp("fakePath2"), 0, 5, Array("host4")) + PartitionedFile(InternalRow.empty, 0, 10, + new FileStatus(20, false, 3, 0, 0, sp("fakePath0").toPath), Array("host0", "host1")), + PartitionedFile(InternalRow.empty, 10, 20, + new FileStatus(20, false, 3, 0, 0, sp("fakePath0").toPath), Array("host1", "host2")), + PartitionedFile(InternalRow.empty, 0, 5, + new FileStatus(5, false, 3, 0, 0, sp("fakePath1").toPath), Array("host3")), + PartitionedFile(InternalRow.empty, 0, 5, + new FileStatus(5, false, 3, 0, 0, sp("fakePath2").toPath), Array("host4")) )) val fakeRDD = new FileScanRDD( @@ -605,8 +609,10 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession { } test(s"SPARK-44021: Test ${SQLConf.FILES_MAX_PARTITION_NUM.key} works as expected") { - val files = - Range(0, 300000).map(p => PartitionedFile(InternalRow.empty, sp(s"$p"), 0, 50000000)) + val files = Range(0, 300000).map { p => + PartitionedFile(InternalRow.empty, 0, 50000000, + new FileStatus(0, false, 1, 0, 0, sp(s"$p").toPath)) + } val maxPartitionBytes = conf.filesMaxPartitionBytes val defaultPartitions = FilePartition.getFilePartitions(spark, files, maxPartitionBytes) assert(defaultPartitions.size === 150000) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala index b6b89ab30439..095363f107d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.Files import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus import org.apache.spark.paths.SparkPath import org.apache.spark.sql.catalyst.InternalRow @@ -38,11 +39,12 @@ class HadoopFileLinesReaderSuite extends SharedSparkSession { Files.write(path.toPath, text.getBytes(StandardCharsets.UTF_8)) val lines = ranges.flatMap { case (start, length) => + val sp = SparkPath.fromPathString(path.getCanonicalPath) val file = PartitionedFile( InternalRow.empty, - SparkPath.fromPathString(path.getCanonicalPath), start, - length) + length, + new FileStatus(path.length(), false, 1, 0, 0, sp.toPath)) val hadoopConf = conf.getOrElse(spark.sessionState.newHadoopConf()) val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index 257a89754f4e..044f6ce202d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -22,6 +22,7 @@ import java.time.ZoneOffset import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.sql.Row @@ -229,8 +230,8 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS // sure the test is configured correctly. assert(parts.size == 2) parts.foreach { part => - val oneFooter = - ParquetFooterReader.readFooter(hadoopConf, part.getPath, NO_FILTER) + val oneFooter = ParquetFooterReader.readFooter( + HadoopInputFile.fromStatus(part, hadoopConf), NO_FILTER) assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 1) val typeName = oneFooter .getFileMetaData.getSchema.getColumns.get(0).getPrimitiveType.getPrimitiveTypeName diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index b7b082e32965..12daef65eabc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -144,8 +144,8 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { protected def readFooter(path: Path, configuration: Configuration): ParquetMetadata = { ParquetFooterReader.readFooter( - configuration, - new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE), + HadoopInputFile.fromPath( + new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE), configuration), ParquetMetadataConverter.NO_FILTER) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d3f625542d96..769b633a9c52 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.apache.parquet.hadoop.util.HadoopInputFile import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} import org.scalatest.BeforeAndAfterEach @@ -2710,8 +2711,9 @@ class HiveDDLSuite OrcFileOperator.getFileReader(maybeFile.get.toPath.toString).get.getCompression.name case "parquet" => + val hadoopConf = sparkContext.hadoopConfiguration val footer = ParquetFooterReader.readFooter( - sparkContext.hadoopConfiguration, new Path(maybeFile.get.getPath), NO_FILTER) + HadoopInputFile.fromPath(new Path(maybeFile.get.getPath), hadoopConf), NO_FILTER) footer.getBlocks.get(0).getColumns.get(0).getCodec.toString }