diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a930f638ca664..09fb414fe429d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -967,6 +967,20 @@ object SQLConf { .booleanConf .createWithDefault(false) + val FILE_META_CACHE_PARQUET_ENABLED = buildConf("spark.sql.fileMetaCache.parquet.enabled") + .doc("To indicate if enable parquet file meta cache, it is recommended to enabled " + + "this config when multiple queries are performed on the same dataset, default is false.") + .version("3.3.0") + .booleanConf + .createWithDefault(false) + + val FILE_META_CACHE_TTL_SINCE_LAST_ACCESS = + buildConf("spark.sql.fileMetaCache.ttlSinceLastAccess") + .version("3.3.0") + .doc("Time-to-live for file metadata cache entry after last access, the unit is seconds.") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(3600L) + val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") .doc("When true, check all the partition paths under the table\'s root directory " + "when reading data stored in HDFS. This configuration will be deprecated in the future " + @@ -3600,6 +3614,8 @@ class SQLConf extends Serializable with Logging { def parquetVectorizedReaderBatchSize: Int = getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE) + def fileMetaCacheParquetEnabled: Boolean = getConf(FILE_META_CACHE_PARQUET_ENABLED) + def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 6264d6341c65a..9d6dec45af332 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -39,11 +39,16 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.schema.MessageType; @@ -77,6 +82,8 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader fileMetadata = reader.getFileMetaData().getKeyValueMetaData(); + ParquetMetadata footer = + readFooterByRange(configuration, split.getStart(), split.getStart() + split.getLength()); + this.fileSchema = footer.getFileMetaData().getSchema(); + FilterCompat.Filter filter = ParquetInputFormat.getFilter(configuration); + List blocks = + RowGroupFilter.filterRowGroups(filter, footer.getBlocks(), fileSchema); + Map fileMetadata = footer.getFileMetaData().getKeyValueMetaData(); ReadSupport readSupport = getReadSupportInstance(getReadSupportClass(configuration)); ReadSupport.ReadContext readContext = readSupport.init(new InitContext( taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema)); this.requestedSchema = readContext.getRequestedSchema(); - reader.setRequestedSchema(requestedSchema); String sparkRequestedSchemaString = configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA()); this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); + this.reader = new ParquetFileReader( + configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); this.totalRowCount = reader.getFilteredRecordCount(); // For test purpose. @@ -116,6 +124,28 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont } } + public void setCachedFooter(ParquetMetadata cachedFooter) { + this.cachedFooter = cachedFooter; + } + + private ParquetMetadata readFooterByRange(Configuration configuration, + long start, long end) throws IOException { + if (cachedFooter != null) { + List filteredBlocks = new ArrayList<>(); + List blocks = cachedFooter.getBlocks(); + for (BlockMetaData block : blocks) { + long offset = block.getStartingPos(); + if (offset >= start && offset < end) { + filteredBlocks.add(block); + } + } + return new ParquetMetadata(cachedFooter.getFileMetaData(), filteredBlocks); + } else { + return ParquetFileReader + .readFooter(configuration, file, ParquetMetadataConverter.range(start, end)); + } + } + /** * Returns the list of files at 'path' recursively. This skips files that are ignored normally * by MapReduce. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala new file mode 100644 index 0000000000000..3d1016d04b4ce --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.concurrent.TimeUnit + +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} +import com.github.benmanes.caffeine.cache.stats.CacheStats +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf + +/** + * A singleton Cache Manager to caching file meta. We cache these file metas in order to speed up + * iterated queries over the same dataset. Otherwise, each query would have to hit remote storage + * in order to fetch file meta before read files. + * + * We should implement the corresponding `FileMetaKey` for a specific file format, for example + * `ParquetFileMetaKey` or `OrcFileMetaKey`. By default, the file path is used as the identification + * of the `FileMetaKey` and the `getFileMeta` method of `FileMetaKey` is used to return the file + * meta of the corresponding file format. + */ +object FileMetaCacheManager extends Logging { + + private lazy val cacheLoader = new CacheLoader[FileMetaKey, FileMeta]() { + override def load(entry: FileMetaKey): FileMeta = { + logDebug(s"Loading Data File Meta ${entry.path}") + entry.getFileMeta + } + } + + private lazy val ttlTime = + SparkEnv.get.conf.get(SQLConf.FILE_META_CACHE_TTL_SINCE_LAST_ACCESS) + + private lazy val cache = Caffeine + .newBuilder() + .expireAfterAccess(ttlTime, TimeUnit.SECONDS) + .recordStats() + .build[FileMetaKey, FileMeta](cacheLoader) + + /** + * Returns the `FileMeta` associated with the `FileMetaKey` in the `FileMetaCacheManager`, + * obtaining that the `FileMeta` from `cacheLoader.load(FileMetaKey)` if necessary. + */ + def get(dataFile: FileMetaKey): FileMeta = cache.get(dataFile) + + /** + * This is visible for testing. + */ + def cacheStats: CacheStats = cache.stats() + + /** + * This is visible for testing. + */ + def cleanUp(): Unit = cache.cleanUp() +} + +abstract class FileMetaKey { + def path: Path + def configuration: Configuration + def getFileMeta: FileMeta + override def hashCode(): Int = path.hashCode + override def equals(other: Any): Boolean = other match { + case df: FileMetaKey => path.equals(df.path) + case _ => false + } +} + +trait FileMeta diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index e2fe5b4486213..dd3e7a59c3a4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -228,6 +228,10 @@ class ParquetFileFormat SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + hadoopConf.setBoolean( + SQLConf.FILE_META_CACHE_PARQUET_ENABLED.key, + sparkSession.sessionState.conf.fileMetaCacheParquetEnabled) + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -263,9 +267,14 @@ class ParquetFileFormat val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) val sharedConf = broadcastedHadoopConf.value.value + val metaCacheEnabled = + sharedConf.getBoolean(SQLConf.FILE_META_CACHE_PARQUET_ENABLED.key, false) - lazy val footerFileMetaData = + lazy val footerFileMetaData = if (metaCacheEnabled) { + ParquetFileMeta.readFooterFromCache(filePath, sharedConf).getFileMetaData + } else { ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + } val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) @@ -327,6 +336,11 @@ class ParquetFileFormat int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) + // Set footer before initialize. + if (metaCacheEnabled) { + val footer = ParquetFileMeta.readFooterFromCache(filePath, sharedConf) + vectorizedReader.setCachedFooter(footer) + } val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion listener before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala new file mode 100644 index 0000000000000..2042c33194fa2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.metadata.ParquetMetadata + +import org.apache.spark.sql.execution.datasources.{FileMeta, FileMetaCacheManager, FileMetaKey} + +case class ParquetFileMetaKey(path: Path, configuration: Configuration) + extends FileMetaKey { + override def getFileMeta: ParquetFileMeta = ParquetFileMeta(path, configuration) +} + +class ParquetFileMeta(val footer: ParquetMetadata) extends FileMeta + +object ParquetFileMeta { + def apply(path: Path, conf: Configuration): ParquetFileMeta = { + new ParquetFileMeta(ParquetFileReader.readFooter(conf, path, NO_FILTER)) + } + + def readFooterFromCache(path: Path, conf: Configuration): ParquetMetadata = + readFooterFromCache(ParquetFileMetaKey(path, conf)) + + def readFooterFromCache(key: ParquetFileMetaKey): ParquetMetadata = + FileMetaCacheManager.get(key).asInstanceOf[ParquetFileMeta].footer +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 058669b0937fa..555b6e5f2143e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -77,6 +77,7 @@ case class ParquetPartitionReaderFactory( private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + private val parquetMetaCacheEnabled = sqlConf.fileMetaCacheParquetEnabled private val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead @@ -131,8 +132,11 @@ case class ParquetPartitionReaderFactory( val filePath = new Path(new URI(file.filePath)) val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - lazy val footerFileMetaData = + lazy val footerFileMetaData = if (parquetMetaCacheEnabled) { + ParquetFileMeta.readFooterFromCache(filePath, conf).getFileMetaData + } else { ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData + } val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) @@ -249,6 +253,12 @@ case class ParquetPartitionReaderFactory( int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) + // Set footer before initialize. + if (parquetMetaCacheEnabled) { + val fileMeta = + ParquetFileMeta.readFooterFromCache(split.getPath, hadoopAttemptContext.getConfiguration) + vectorizedReader.setCachedFooter(fileMeta) + } val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion listener before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 9ef43995467c6..80183ff9e663e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.{SchemaColumnConvertNotSupportedException, SQLHadoopMapReduceCommitProtocol} +import org.apache.spark.sql.execution.datasources.{FileMetaCacheManager, SchemaColumnConvertNotSupportedException, SQLHadoopMapReduceCommitProtocol} import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan @@ -933,6 +933,39 @@ class ParquetV1QuerySuite extends ParquetQuerySuite { } } } + + test("SPARK-33449: simple select queries with file meta cache") { + withSQLConf(SQLConf.FILE_META_CACHE_PARQUET_ENABLED.key -> "true") { + val tableName = "parquet_use_meta_cache" + withTable(tableName) { + (0 until 10).map(i => (i, i.toString)).toDF("id", "value") + .write.saveAsTable(tableName) + try { + val statsBeforeQuery = FileMetaCacheManager.cacheStats + checkAnswer(sql(s"SELECT id FROM $tableName where id > 5"), + (6 until 10).map(Row.apply(_))) + val statsAfterQuery1 = FileMetaCacheManager.cacheStats + // The 1st query triggers 4 times file meta read: 2 times related to + // push down filter and 2 times related to file read. The 1st query + // run twice: df.collect() and df.rdd.count(), so it triggers 8 times + // file meta read in total. missCount is 2 because cache is empty and + // 2 meta files need load, other 6 times will read meta from cache. + assert(statsAfterQuery1.missCount() - statsBeforeQuery.missCount() == 2) + assert(statsAfterQuery1.hitCount() - statsBeforeQuery.hitCount() == 6) + checkAnswer(sql(s"SELECT id FROM $tableName where id < 5"), + (0 until 5).map(Row.apply(_))) + val statsAfterQuery2 = FileMetaCacheManager.cacheStats + // The 2nd query also triggers 8 times file meta read in total and + // all read from meta cache, so missCount no growth and hitCount + // increase 8 times. + assert(statsAfterQuery2.missCount() - statsAfterQuery1.missCount() == 0) + assert(statsAfterQuery2.hitCount() - statsAfterQuery1.hitCount() == 8) + } finally { + FileMetaCacheManager.cleanUp() + } + } + } + } } class ParquetV2QuerySuite extends ParquetQuerySuite {