-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33449][SQL] Support File Metadata Cache for Parquet #30483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d3d6e61
c7c9736
f0ac389
8357771
0b0ecf4
8bba51a
a112791
3e2db1a
92d2f37
b8b45ec
c63d7cb
7ff0502
44ca052
7254d88
bc25c4e
6079adc
190dc8a
c485cc5
b872010
3f39531
99f18f5
6360580
0a224ff
98ef2de
120678d
eb8fa71
0c93459
61175ed
0d4f3e8
c5b827e
179e7b0
36f502e
850c52b
630f8db
aace310
fa75a95
4c022d7
104b125
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -967,6 +967,20 @@ object SQLConf { | |
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val FILE_META_CACHE_PARQUET_ENABLED = buildConf("spark.sql.fileMetaCache.parquet.enabled") | ||
| .doc("To indicate if enable parquet file meta cache, it is recommended to enabled " + | ||
| "this config when multiple queries are performed on the same dataset, default is false.") | ||
| .version("3.3.0") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| val FILE_META_CACHE_TTL_SINCE_LAST_ACCESS = | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good suggestion |
||
| buildConf("spark.sql.fileMetaCache.ttlSinceLastAccess") | ||
| .version("3.3.0") | ||
| .doc("Time-to-live for file metadata cache entry after last access, the unit is seconds.") | ||
| .timeConf(TimeUnit.SECONDS) | ||
| .createWithDefault(3600L) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change default value to 1hour (3600s) |
||
|
|
||
| val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") | ||
| .doc("When true, check all the partition paths under the table\'s root directory " + | ||
| "when reading data stored in HDFS. This configuration will be deprecated in the future " + | ||
|
|
@@ -3600,6 +3614,8 @@ class SQLConf extends Serializable with Logging { | |
|
|
||
| def parquetVectorizedReaderBatchSize: Int = getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE) | ||
|
|
||
| def fileMetaCacheParquetEnabled: Boolean = getConf(FILE_META_CACHE_PARQUET_ENABLED) | ||
|
|
||
| def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) | ||
|
|
||
| def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,11 +39,16 @@ | |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; | ||
| import org.apache.parquet.HadoopReadOptions; | ||
| import org.apache.parquet.ParquetReadOptions; | ||
| import org.apache.parquet.filter2.compat.FilterCompat; | ||
| import org.apache.parquet.filter2.compat.RowGroupFilter; | ||
| import org.apache.parquet.format.converter.ParquetMetadataConverter; | ||
| import org.apache.parquet.hadoop.BadConfigurationException; | ||
| import org.apache.parquet.hadoop.ParquetFileReader; | ||
| import org.apache.parquet.hadoop.ParquetInputFormat; | ||
| import org.apache.parquet.hadoop.api.InitContext; | ||
| import org.apache.parquet.hadoop.api.ReadSupport; | ||
| import org.apache.parquet.hadoop.metadata.ParquetMetadata; | ||
| import org.apache.parquet.hadoop.metadata.BlockMetaData; | ||
| import org.apache.parquet.hadoop.util.ConfigurationUtil; | ||
| import org.apache.parquet.hadoop.util.HadoopInputFile; | ||
| import org.apache.parquet.schema.MessageType; | ||
|
|
@@ -77,28 +82,31 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo | |
|
|
||
| protected ParquetFileReader reader; | ||
|
|
||
| protected ParquetMetadata cachedFooter; | ||
|
|
||
| @Override | ||
| public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) | ||
| throws IOException, InterruptedException { | ||
| Configuration configuration = taskAttemptContext.getConfiguration(); | ||
| FileSplit split = (FileSplit) inputSplit; | ||
| this.file = split.getPath(); | ||
|
|
||
| ParquetReadOptions options = HadoopReadOptions | ||
| .builder(configuration) | ||
| .withRange(split.getStart(), split.getStart() + split.getLength()) | ||
| .build(); | ||
| this.reader = new ParquetFileReader(HadoopInputFile.fromPath(file, configuration), options); | ||
| this.fileSchema = reader.getFileMetaData().getSchema(); | ||
| Map<String, String> fileMetadata = reader.getFileMetaData().getKeyValueMetaData(); | ||
| ParquetMetadata footer = | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dongjoon-hyun The key problem is here: after SPARK-32703, we use new API to create a
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, got it. Thank you for pointing out this issue, @LuciferYang . |
||
| readFooterByRange(configuration, split.getStart(), split.getStart() + split.getLength()); | ||
| this.fileSchema = footer.getFileMetaData().getSchema(); | ||
| FilterCompat.Filter filter = ParquetInputFormat.getFilter(configuration); | ||
| List<BlockMetaData> blocks = | ||
| RowGroupFilter.filterRowGroups(filter, footer.getBlocks(), fileSchema); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this apply all the filter levels? e.g., stats, dictionary, and bloom filter.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need to investigate it again |
||
| Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData(); | ||
| ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration)); | ||
| ReadSupport.ReadContext readContext = readSupport.init(new InitContext( | ||
| taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema)); | ||
| this.requestedSchema = readContext.getRequestedSchema(); | ||
| reader.setRequestedSchema(requestedSchema); | ||
| String sparkRequestedSchemaString = | ||
| configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA()); | ||
| this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); | ||
| this.reader = new ParquetFileReader( | ||
| configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); | ||
| this.totalRowCount = reader.getFilteredRecordCount(); | ||
|
|
||
| // For test purpose. | ||
|
|
@@ -116,6 +124,28 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont | |
| } | ||
| } | ||
|
|
||
| public void setCachedFooter(ParquetMetadata cachedFooter) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't want to add a similar API, we can also retrieve footer from cache in this file |
||
| this.cachedFooter = cachedFooter; | ||
| } | ||
|
|
||
| private ParquetMetadata readFooterByRange(Configuration configuration, | ||
| long start, long end) throws IOException { | ||
| if (cachedFooter != null) { | ||
| List<BlockMetaData> filteredBlocks = new ArrayList<>(); | ||
| List<BlockMetaData> blocks = cachedFooter.getBlocks(); | ||
| for (BlockMetaData block : blocks) { | ||
| long offset = block.getStartingPos(); | ||
| if (offset >= start && offset < end) { | ||
| filteredBlocks.add(block); | ||
| } | ||
| } | ||
| return new ParquetMetadata(cachedFooter.getFileMetaData(), filteredBlocks); | ||
| } else { | ||
| return ParquetFileReader | ||
| .readFooter(configuration, file, ParquetMetadataConverter.range(start, end)); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Returns the list of files at 'path' recursively. This skips files that are ignored normally | ||
| * by MapReduce. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,87 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources | ||
|
|
||
| import java.util.concurrent.TimeUnit | ||
|
|
||
| import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} | ||
| import com.github.benmanes.caffeine.cache.stats.CacheStats | ||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.Path | ||
|
|
||
| import org.apache.spark.SparkEnv | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.internal.SQLConf | ||
|
|
||
| /** | ||
| * A singleton Cache Manager to caching file meta. We cache these file metas in order to speed up | ||
| * iterated queries over the same dataset. Otherwise, each query would have to hit remote storage | ||
| * in order to fetch file meta before read files. | ||
| * | ||
| * We should implement the corresponding `FileMetaKey` for a specific file format, for example | ||
| * `ParquetFileMetaKey` or `OrcFileMetaKey`. By default, the file path is used as the identification | ||
| * of the `FileMetaKey` and the `getFileMeta` method of `FileMetaKey` is used to return the file | ||
| * meta of the corresponding file format. | ||
| */ | ||
| object FileMetaCacheManager extends Logging { | ||
|
|
||
| private lazy val cacheLoader = new CacheLoader[FileMetaKey, FileMeta]() { | ||
| override def load(entry: FileMetaKey): FileMeta = { | ||
| logDebug(s"Loading Data File Meta ${entry.path}") | ||
| entry.getFileMeta | ||
| } | ||
| } | ||
|
|
||
| private lazy val ttlTime = | ||
| SparkEnv.get.conf.get(SQLConf.FILE_META_CACHE_TTL_SINCE_LAST_ACCESS) | ||
|
|
||
| private lazy val cache = Caffeine | ||
| .newBuilder() | ||
| .expireAfterAccess(ttlTime, TimeUnit.SECONDS) | ||
| .recordStats() | ||
| .build[FileMetaKey, FileMeta](cacheLoader) | ||
|
|
||
| /** | ||
| * Returns the `FileMeta` associated with the `FileMetaKey` in the `FileMetaCacheManager`, | ||
| * obtaining that the `FileMeta` from `cacheLoader.load(FileMetaKey)` if necessary. | ||
| */ | ||
| def get(dataFile: FileMetaKey): FileMeta = cache.get(dataFile) | ||
|
|
||
| /** | ||
| * This is visible for testing. | ||
| */ | ||
| def cacheStats: CacheStats = cache.stats() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is visible for only
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, add commets |
||
|
|
||
| /** | ||
| * This is visible for testing. | ||
| */ | ||
| def cleanUp(): Unit = cache.cleanUp() | ||
| } | ||
|
|
||
| abstract class FileMetaKey { | ||
| def path: Path | ||
| def configuration: Configuration | ||
| def getFileMeta: FileMeta | ||
| override def hashCode(): Int = path.hashCode | ||
| override def equals(other: Any): Boolean = other match { | ||
| case df: FileMetaKey => path.equals(df.path) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if the same file gets replaced? how do we invalidate the cache? this is very common from my experience, e.g., Hive overwrite a partition.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a very good question, we discussed in #33748 (comment), And At the same time, I added a warning for this feature in SQLConf. Now Parquet is a draft because the Deprecated API, We are focusing on ORC (SPARK-36516) now |
||
| case _ => false | ||
| } | ||
| } | ||
|
|
||
| trait FileMeta | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,45 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.datasources.parquet | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.Path | ||
| import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER | ||
| import org.apache.parquet.hadoop.ParquetFileReader | ||
| import org.apache.parquet.hadoop.metadata.ParquetMetadata | ||
|
|
||
| import org.apache.spark.sql.execution.datasources.{FileMeta, FileMetaCacheManager, FileMetaKey} | ||
|
|
||
| case class ParquetFileMetaKey(path: Path, configuration: Configuration) | ||
| extends FileMetaKey { | ||
| override def getFileMeta: ParquetFileMeta = ParquetFileMeta(path, configuration) | ||
| } | ||
|
|
||
| class ParquetFileMeta(val footer: ParquetMetadata) extends FileMeta | ||
|
|
||
| object ParquetFileMeta { | ||
| def apply(path: Path, conf: Configuration): ParquetFileMeta = { | ||
| new ParquetFileMeta(ParquetFileReader.readFooter(conf, path, NO_FILTER)) | ||
| } | ||
|
|
||
| def readFooterFromCache(path: Path, conf: Configuration): ParquetMetadata = | ||
| readFooterFromCache(ParquetFileMetaKey(path, conf)) | ||
|
|
||
| def readFooterFromCache(key: ParquetFileMetaKey): ParquetMetadata = | ||
| FileMetaCacheManager.get(key).asInstanceOf[ParquetFileMeta].footer | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,7 @@ case class ParquetPartitionReaderFactory( | |
| private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal | ||
| private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith | ||
| private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold | ||
| private val parquetMetaCacheEnabled = sqlConf.fileMetaCacheParquetEnabled | ||
| private val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead | ||
| private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead | ||
|
|
||
|
|
@@ -131,8 +132,11 @@ case class ParquetPartitionReaderFactory( | |
| val filePath = new Path(new URI(file.filePath)) | ||
| val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) | ||
|
|
||
| lazy val footerFileMetaData = | ||
| lazy val footerFileMetaData = if (parquetMetaCacheEnabled) { | ||
| ParquetFileMeta.readFooterFromCache(filePath, conf).getFileMetaData | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happen if the file is removed and replaced?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can discuss it in #33748 first. I'll set this PR to draft first |
||
| } else { | ||
| ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData | ||
| } | ||
| val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( | ||
| footerFileMetaData.getKeyValueMetaData.get, | ||
| datetimeRebaseModeInRead) | ||
|
|
@@ -249,6 +253,12 @@ case class ParquetPartitionReaderFactory( | |
| int96RebaseMode.toString, | ||
| enableOffHeapColumnVector && taskContext.isDefined, | ||
| capacity) | ||
| // Set footer before initialize. | ||
| if (parquetMetaCacheEnabled) { | ||
| val fileMeta = | ||
| ParquetFileMeta.readFooterFromCache(split.getPath, hadoopAttemptContext.getConfiguration) | ||
| vectorizedReader.setCachedFooter(fileMeta) | ||
| } | ||
| val iter = new RecordReaderIterator(vectorizedReader) | ||
| // SPARK-23457 Register a task completion listener before `initialization`. | ||
| taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm curious whether this can help if your Spark queries is running as separate Spark jobs, where each of them may use different executors.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this feature does have limitations,
NODE_LOCAL + thrift-serverwith interactive analysis should be the best scene. If the architecture is storage and computing are separated, we need to consider the task scheduling.In fact, in the OAP project, the fileMetaCache is relies on dataCache(PROCESS_LOCAL)