From d3d6e61704a4517005bb20c1edd14b48f2122018 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Nov 2020 19:39:06 +0800 Subject: [PATCH 01/30] add simple support for Parquet --- .../apache/spark/sql/internal/SQLConf.scala | 7 ++ .../SpecificParquetRecordReaderBase.java | 35 +++++++++- .../VectorizedParquetRecordReader.java | 1 + .../DataFileMetaCacheManager.scala | 70 +++++++++++++++++++ .../parquet/ParquetFileFormat.scala | 19 ++++- .../datasources/parquet/ParquetFileMeta.scala | 43 ++++++++++++ .../ParquetPartitionReaderFactory.scala | 12 +++- 7 files changed, 182 insertions(+), 5 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataFileMetaCacheManager.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ef974dc176e51..de1618f37fd74 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -765,6 +765,11 @@ object SQLConf { .booleanConf .createWithDefault(false) + val PARQUET_META_CACHE_ENABLED = buildConf("spark.sql.parquet.metadataCache.enabled") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + val PARQUET_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.parquet.columnarReaderBatchSize") .doc("The number of rows to include in a parquet vectorized reader batch. The number should " + "be carefully chosen to minimize overhead and avoid OOMs in reading data.") @@ -3303,6 +3308,8 @@ class SQLConf extends Serializable with Logging { def parquetRecordFilterEnabled: Boolean = getConf(PARQUET_RECORD_FILTER_ENABLED) + def parquetMetaCacheEnabled: Boolean = getConf(PARQUET_META_CACHE_ENABLED) + def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) def inMemoryTableScanStatisticsEnabled: Boolean = getConf(IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index c975e52734e01..2f5dd499cb5c7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Set; +import com.google.common.collect.Lists; import scala.Option; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; @@ -88,6 +89,8 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader offsets = new HashSet<>(); for (long offset : rowGroupOffsets) { offsets.add(offset); @@ -244,6 +247,34 @@ public void close() throws IOException { } } + public void setCachedFooter(ParquetMetadata cachedFooter) { + this.cachedFooter = cachedFooter; + } + + private ParquetMetadata getFooterByRange(Configuration configuration, long start, long end) throws IOException { + if (cachedFooter != null) { + List filteredBlocks = Lists.newArrayList(); + List blocks = cachedFooter.getBlocks(); + for (BlockMetaData block : blocks) { + long offset = block.getStartingPos(); + if (offset >= start && offset < end) { + filteredBlocks.add(block); + } + } + return new ParquetMetadata(cachedFooter.getFileMetaData(), filteredBlocks); + } else { + return readFooter(configuration, file, range(start, end)); + } + } + + private ParquetMetadata getFooter(Configuration configuration) throws IOException { + if (cachedFooter != null) { + return cachedFooter; + } else { + return readFooter(configuration, file, NO_FILTER); + } + } + /** * Utility classes to abstract over different way to read ints with different encodings. * TODO: remove this layer of abstraction? diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 9d38a74a2956a..a1a8f745e01d6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.Type; import org.apache.spark.memory.MemoryMode; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataFileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataFileMetaCacheManager.scala new file mode 100644 index 0000000000000..19ab2fea0a3a5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataFileMetaCacheManager.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.concurrent.TimeUnit + +import com.google.common.cache.{CacheBuilder, CacheLoader, RemovalListener, RemovalNotification} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.internal.Logging + +private[sql] object DataFileMetaCacheManager extends Logging { + type ENTRY = FileMetaKey + + // Need configurable + private val cache = + CacheBuilder + .newBuilder() + .concurrencyLevel(4) // DEFAULT_CONCURRENCY_LEVEL TODO verify that if it works + .expireAfterAccess(1000, TimeUnit.SECONDS) // auto expire after 1000 seconds. + .removalListener(new RemovalListener[ENTRY, FileMeta]() { + override def onRemoval(n: RemovalNotification[ENTRY, FileMeta]): Unit = { + logDebug(s"Evicting Data File Meta ${n.getKey.path}") + } + }) + .build[ENTRY, FileMeta](new CacheLoader[ENTRY, FileMeta]() { + override def load(entry: ENTRY) + : FileMeta = { + logDebug(s"Loading Data File Meta ${entry.path}") + entry.getFileMeta + } + }) + + def get(dataFile: FileMetaKey): FileMeta = { + cache.get(dataFile) + } + + def stop(): Unit = { + cache.cleanUp() + } +} + +abstract class FileMetaKey { + def path: Path + def configuration: Configuration + def getFileMeta: FileMeta + override def hashCode(): Int = path.hashCode + override def equals(other: Any): Boolean = other match { + case df: FileMetaKey => path.equals(df.path) + case _ => false + } +} + +trait FileMeta diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 1901f5575470e..e4f6df558db4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -227,6 +227,10 @@ class ParquetFileFormat SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + hadoopConf.setBoolean( + SQLConf.PARQUET_META_CACHE_ENABLED.key, + sparkSession.sessionState.conf.parquetMetaCacheEnabled) + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -266,9 +270,15 @@ class ParquetFileFormat null) val sharedConf = broadcastedHadoopConf.value.value + val metaCacheEnabled = + sharedConf.getBoolean(SQLConf.PARQUET_META_CACHE_ENABLED.key, false) - lazy val footerFileMetaData = + lazy val footerFileMetaData = if (metaCacheEnabled) { + DataFileMetaCacheManager.get(ParquetFileMetaKey(filePath, sharedConf)) + .asInstanceOf[ParquetFileMeta].footer.getFileMetaData + } else { ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + } // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema @@ -323,6 +333,13 @@ class ParquetFileFormat int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) + // Set footer before initialize. + if (metaCacheEnabled) { + val fileMeta = DataFileMetaCacheManager + .get(ParquetFileMetaKey(filePath, sharedConf)) + .asInstanceOf[ParquetFileMeta] + vectorizedReader.setCachedFooter(fileMeta.footer) + } val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion listener before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala new file mode 100644 index 0000000000000..469fbab1545c8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import com.google.common.collect.Lists +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.metadata.ParquetMetadata + +import org.apache.spark.sql.execution.datasources.{FileMeta, FileMetaKey} + +private[sql] case class ParquetFileMetaKey(path: Path, configuration: Configuration) + extends FileMetaKey { + override def getFileMeta: ParquetFileMeta = ParquetFileMeta(path, configuration) +} + +class ParquetFileMeta(val footer: ParquetMetadata) extends FileMeta { + def copyFileMeta: ParquetMetadata = + new ParquetMetadata(footer.getFileMetaData, Lists.newArrayList(footer.getBlocks)) +} + +object ParquetFileMeta { + def apply(path: Path, conf: Configuration): ParquetFileMeta = { + new ParquetFileMeta(ParquetFileReader.readFooter(conf, path, NO_FILTER)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index e4d5e9b2d9f6d..ed98fc23f70aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -26,14 +26,13 @@ import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetInputSplit, ParquetRecordReader} - import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} -import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.{DataFileMetaCacheManager, DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf @@ -74,6 +73,7 @@ case class ParquetPartitionReaderFactory( private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + private val parquetMetaCacheEnabled = sqlConf.parquetMetaCacheEnabled override def supportColumnarReads(partition: InputPartition): Boolean = { sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled && @@ -225,6 +225,14 @@ case class ParquetPartitionReaderFactory( private def createVectorizedReader(file: PartitionedFile): VectorizedParquetRecordReader = { val vectorizedReader = buildReaderBase(file, createParquetVectorizedReader) .asInstanceOf[VectorizedParquetRecordReader] + // Set footer before initialize. + if (parquetMetaCacheEnabled) { + val filePath = new Path(new URI(file.filePath)) + val fileMeta = DataFileMetaCacheManager + .get(ParquetFileMetaKey(filePath, broadcastedConf.value.value)) + .asInstanceOf[ParquetFileMeta] + vectorizedReader.setCachedFooter(fileMeta.footer) + } vectorizedReader.initBatch(partitionSchema, file.partitionValues) vectorizedReader } From c7c9736881bbbb047fe7eb7927a5ead10051ddd7 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Nov 2020 19:44:26 +0800 Subject: [PATCH 02/30] use a footer copy --- .../datasources/parquet/SpecificParquetRecordReaderBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 2f5dd499cb5c7..ad9bec34c0761 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -269,7 +269,7 @@ private ParquetMetadata getFooterByRange(Configuration configuration, long start private ParquetMetadata getFooter(Configuration configuration) throws IOException { if (cachedFooter != null) { - return cachedFooter; + return new ParquetMetadata(cachedFooter.getFileMetaData(), cachedFooter.getBlocks()); } else { return readFooter(configuration, file, NO_FILTER); } From f0ac38925774c8b1da377cc78c3b2bb7c3265531 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Nov 2020 19:45:12 +0800 Subject: [PATCH 03/30] remove copy method --- .../sql/execution/datasources/parquet/ParquetFileMeta.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala index 469fbab1545c8..96eb53a2e2bc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.datasources.parquet -import com.google.common.collect.Lists import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER @@ -31,10 +30,7 @@ private[sql] case class ParquetFileMetaKey(path: Path, configuration: Configurat override def getFileMeta: ParquetFileMeta = ParquetFileMeta(path, configuration) } -class ParquetFileMeta(val footer: ParquetMetadata) extends FileMeta { - def copyFileMeta: ParquetMetadata = - new ParquetMetadata(footer.getFileMetaData, Lists.newArrayList(footer.getBlocks)) -} +class ParquetFileMeta(val footer: ParquetMetadata) extends FileMeta object ParquetFileMeta { def apply(path: Path, conf: Configuration): ParquetFileMeta = { From 83577716a7ec03faf7d4ecdc86ca30afd942d947 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Nov 2020 19:46:46 +0800 Subject: [PATCH 04/30] rename FileMetaCacheManager --- ...aFileMetaCacheManager.scala => FileMetaCacheManager.scala} | 2 +- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 4 ++-- .../v2/parquet/ParquetPartitionReaderFactory.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/{DataFileMetaCacheManager.scala => FileMetaCacheManager.scala} (97%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataFileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala similarity index 97% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataFileMetaCacheManager.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala index 19ab2fea0a3a5..d43ecc264aec4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataFileMetaCacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging -private[sql] object DataFileMetaCacheManager extends Logging { +private[sql] object FileMetaCacheManager extends Logging { type ENTRY = FileMetaKey // Need configurable diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index e4f6df558db4b..4784558be1368 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -274,7 +274,7 @@ class ParquetFileFormat sharedConf.getBoolean(SQLConf.PARQUET_META_CACHE_ENABLED.key, false) lazy val footerFileMetaData = if (metaCacheEnabled) { - DataFileMetaCacheManager.get(ParquetFileMetaKey(filePath, sharedConf)) + FileMetaCacheManager.get(ParquetFileMetaKey(filePath, sharedConf)) .asInstanceOf[ParquetFileMeta].footer.getFileMetaData } else { ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData @@ -335,7 +335,7 @@ class ParquetFileFormat capacity) // Set footer before initialize. if (metaCacheEnabled) { - val fileMeta = DataFileMetaCacheManager + val fileMeta = FileMetaCacheManager .get(ParquetFileMetaKey(filePath, sharedConf)) .asInstanceOf[ParquetFileMeta] vectorizedReader.setCachedFooter(fileMeta.footer) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index ed98fc23f70aa..f6aef22e0ccab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} -import org.apache.spark.sql.execution.datasources.{DataFileMetaCacheManager, DataSourceUtils, PartitionedFile, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.{FileMetaCacheManager, DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf @@ -228,7 +228,7 @@ case class ParquetPartitionReaderFactory( // Set footer before initialize. if (parquetMetaCacheEnabled) { val filePath = new Path(new URI(file.filePath)) - val fileMeta = DataFileMetaCacheManager + val fileMeta = FileMetaCacheManager .get(ParquetFileMetaKey(filePath, broadcastedConf.value.value)) .asInstanceOf[ParquetFileMeta] vectorizedReader.setCachedFooter(fileMeta.footer) From 0b0ecf4be4275868b8e94f3a27d848c500e7dbc9 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 24 Nov 2020 21:27:39 +0800 Subject: [PATCH 05/30] fix format --- .../datasources/v2/parquet/ParquetPartitionReaderFactory.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index f6aef22e0ccab..8abc441014a78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -26,13 +26,14 @@ import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetInputSplit, ParquetRecordReader} + import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} -import org.apache.spark.sql.execution.datasources.{FileMetaCacheManager, DataSourceUtils, PartitionedFile, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileMetaCacheManager, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf From a1127913a75484ed7cb011db218234919556eb2e Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 25 Nov 2020 16:07:39 +0800 Subject: [PATCH 06/30] parquet v2 --- .../datasources/FileMetaCacheManager.scala | 2 +- .../ParquetPartitionReaderFactory.scala | 22 +++++++++++-------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala index d43ecc264aec4..b669d85a25510 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala @@ -29,7 +29,7 @@ private[sql] object FileMetaCacheManager extends Logging { type ENTRY = FileMetaKey // Need configurable - private val cache = + private lazy val cache = CacheBuilder .newBuilder() .concurrencyLevel(4) // DEFAULT_CONCURRENCY_LEVEL TODO verify that if it works diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 8abc441014a78..99bfa2d652d8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -134,8 +134,12 @@ case class ParquetPartitionReaderFactory( Array.empty, null) - lazy val footerFileMetaData = + lazy val footerFileMetaData = if (parquetMetaCacheEnabled) { + FileMetaCacheManager.get(ParquetFileMetaKey(filePath, conf)) + .asInstanceOf[ParquetFileMeta].footer.getFileMetaData + } else { ParquetFileReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData + } // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema @@ -226,14 +230,7 @@ case class ParquetPartitionReaderFactory( private def createVectorizedReader(file: PartitionedFile): VectorizedParquetRecordReader = { val vectorizedReader = buildReaderBase(file, createParquetVectorizedReader) .asInstanceOf[VectorizedParquetRecordReader] - // Set footer before initialize. - if (parquetMetaCacheEnabled) { - val filePath = new Path(new URI(file.filePath)) - val fileMeta = FileMetaCacheManager - .get(ParquetFileMetaKey(filePath, broadcastedConf.value.value)) - .asInstanceOf[ParquetFileMeta] - vectorizedReader.setCachedFooter(fileMeta.footer) - } + vectorizedReader.initBatch(partitionSchema, file.partitionValues) vectorizedReader } @@ -253,6 +250,13 @@ case class ParquetPartitionReaderFactory( int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) + // Set footer before initialize. + if (parquetMetaCacheEnabled) { + val fileMeta = FileMetaCacheManager + .get(ParquetFileMetaKey(split.getPath, hadoopAttemptContext.getConfiguration)) + .asInstanceOf[ParquetFileMeta] + vectorizedReader.setCachedFooter(fileMeta.footer) + } val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion listener before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) From 3e2db1a1cdec3df84e9ceb9cc64860b7f88c6720 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 25 Nov 2020 16:28:15 +0800 Subject: [PATCH 07/30] fix format --- .../datasources/parquet/SpecificParquetRecordReaderBase.java | 3 ++- .../datasources/parquet/VectorizedParquetRecordReader.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index ad9bec34c0761..9190f5e453263 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -251,7 +251,8 @@ public void setCachedFooter(ParquetMetadata cachedFooter) { this.cachedFooter = cachedFooter; } - private ParquetMetadata getFooterByRange(Configuration configuration, long start, long end) throws IOException { + private ParquetMetadata getFooterByRange(Configuration configuration, + long start, long end) throws IOException { if (cachedFooter != null) { List filteredBlocks = Lists.newArrayList(); List blocks = cachedFooter.getBlocks(); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index a1a8f745e01d6..9d38a74a2956a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -26,7 +26,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.Type; import org.apache.spark.memory.MemoryMode; From 92d2f371cc788154422139041b9554ad9df066f4 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 25 Nov 2020 19:08:59 +0800 Subject: [PATCH 08/30] add ttl conf --- .../apache/spark/sql/internal/SQLConf.scala | 9 +++++ .../datasources/FileMetaCacheManager.scala | 39 +++++++++++-------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index de1618f37fd74..176e35c3953b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -766,10 +766,19 @@ object SQLConf { .createWithDefault(false) val PARQUET_META_CACHE_ENABLED = buildConf("spark.sql.parquet.metadataCache.enabled") + .doc("To indicate if enable parquet file meta cache, it is recommended to enabled " + + "this config in long-running process mode, such as Thrift Server, default is false") .version("3.1.0") .booleanConf .createWithDefault(false) + val PARQUET_META_CACHE_TTL_SINCE_LAST_ACCESS = + buildConf("spark.sql.parquet.metadataCache.ttl.sinceLastAccess") + .version("3.1.0") + .doc("Time-to-live for parquet metadata cache entry after last access, the unit is seconds.") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(1000L) + val PARQUET_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.parquet.columnarReaderBatchSize") .doc("The number of rows to include in a parquet vectorized reader batch. The number should " + "be carefully chosen to minimize overhead and avoid OOMs in reading data.") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala index b669d85a25510..c9b2722456b69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala @@ -23,29 +23,36 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, RemovalListener, Remo import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf private[sql] object FileMetaCacheManager extends Logging { - type ENTRY = FileMetaKey - // Need configurable + private lazy val removalListener = new RemovalListener[FileMetaKey, FileMeta]() { + override def onRemoval(n: RemovalNotification[FileMetaKey, FileMeta]): Unit = { + logDebug(s"Evicting Data File Meta ${n.getKey.path}") + } + } + + private lazy val cacheLoader = new CacheLoader[FileMetaKey, FileMeta]() { + override def load(entry: FileMetaKey) + : FileMeta = { + logDebug(s"Loading Data File Meta ${entry.path}") + entry.getFileMeta + } + } + + private lazy val ttlTime = + SparkEnv.get.conf.get(SQLConf.PARQUET_META_CACHE_TTL_SINCE_LAST_ACCESS) + private lazy val cache = CacheBuilder .newBuilder() - .concurrencyLevel(4) // DEFAULT_CONCURRENCY_LEVEL TODO verify that if it works - .expireAfterAccess(1000, TimeUnit.SECONDS) // auto expire after 1000 seconds. - .removalListener(new RemovalListener[ENTRY, FileMeta]() { - override def onRemoval(n: RemovalNotification[ENTRY, FileMeta]): Unit = { - logDebug(s"Evicting Data File Meta ${n.getKey.path}") - } - }) - .build[ENTRY, FileMeta](new CacheLoader[ENTRY, FileMeta]() { - override def load(entry: ENTRY) - : FileMeta = { - logDebug(s"Loading Data File Meta ${entry.path}") - entry.getFileMeta - } - }) + .expireAfterAccess(ttlTime, TimeUnit.SECONDS) + .recordStats() + .removalListener(removalListener) + .build[FileMetaKey, FileMeta](cacheLoader) def get(dataFile: FileMetaKey): FileMeta = { cache.get(dataFile) From b8b45ecb0a976f3f0cc2b25a77496f2a53ceacb4 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 25 Nov 2020 21:04:14 +0800 Subject: [PATCH 09/30] add a test case --- .../datasources/FileMetaCacheManager.scala | 12 +++---- .../parquet/ParquetQuerySuite.scala | 36 ++++++++++++++++++- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala index c9b2722456b69..1c4f3b9ba7db9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.util.concurrent.TimeUnit -import com.google.common.cache.{CacheBuilder, CacheLoader, RemovalListener, RemovalNotification} +import com.google.common.cache.{CacheBuilder, CacheLoader, CacheStats, RemovalListener, RemovalNotification} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -54,13 +54,11 @@ private[sql] object FileMetaCacheManager extends Logging { .removalListener(removalListener) .build[FileMetaKey, FileMeta](cacheLoader) - def get(dataFile: FileMetaKey): FileMeta = { - cache.get(dataFile) - } + def get(dataFile: FileMetaKey): FileMeta = cache.get(dataFile) - def stop(): Unit = { - cache.cleanUp() - } + def cacheStats: CacheStats = cache.stats() + + def cleanUp(): Unit = cache.cleanUp() } abstract class FileMetaKey { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 05d305a9b52ba..cba34becdb6f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol +import org.apache.spark.sql.execution.datasources.{FileMetaCacheManager, SQLHadoopMapReduceCommitProtocol} import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan @@ -840,6 +840,40 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "INT96", toTsType = "TIMESTAMP_MICROS") testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } + + test("SPARK-33449: simple select queries with file meta cache") { + withSQLConf(SQLConf.PARQUET_META_CACHE_ENABLED.key -> "true", + SQLConf.PARQUET_META_CACHE_TTL_SINCE_LAST_ACCESS.key -> "5") { + val tableName = "parquet_use_meta_cache" + withTable(tableName) { + (0 until 10).map(i => (i, i.toString)).toDF("id", "value") + .write.saveAsTable(tableName) + try { + val statsBeforeQuery = FileMetaCacheManager.cacheStats + checkAnswer(sql("SELECT id FROM parquet_use_meta_cache where id > 5"), + (6 until 10).map(Row.apply(_))) + val statsAfterQuery1 = FileMetaCacheManager.cacheStats + // The 1st query triggers 4 times file meta read: 2 times related to + // push down filter and 2 times related to file read. The 1st query + // run twice: df.collect() and df.rdd.count(), so it triggers 8 times + // file meta read in total. missCount is 2 because cache is empty and + // 2 meta of files need load, other 6 times will use meta in cache. + assert(statsAfterQuery1.missCount() - statsBeforeQuery.missCount() == 2) + assert(statsAfterQuery1.hitCount() - statsBeforeQuery.hitCount() == 6) + checkAnswer(sql("SELECT id FROM parquet_use_meta_cache where id < 5"), + (0 until 5).map(Row.apply(_))) + val statsAfterQuery2 = FileMetaCacheManager.cacheStats + // The 2nd query also triggers 8 times file meta read in total and + // all read from meta cache, so missCount no growth and hitCount + // increase 8 times. + assert(statsAfterQuery2.missCount() - statsAfterQuery1.missCount() == 0) + assert(statsAfterQuery2.hitCount() - statsAfterQuery1.hitCount() == 8) + } finally { + FileMetaCacheManager.cleanUp() + } + } + } + } } class ParquetV1QuerySuite extends ParquetQuerySuite { From c63d7cbd23d5cc30a106940e84881234e72e057b Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 25 Nov 2020 21:05:04 +0800 Subject: [PATCH 10/30] add a test case --- .../sql/execution/datasources/parquet/ParquetQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index cba34becdb6f1..1d874e8338c44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -857,7 +857,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS // push down filter and 2 times related to file read. The 1st query // run twice: df.collect() and df.rdd.count(), so it triggers 8 times // file meta read in total. missCount is 2 because cache is empty and - // 2 meta of files need load, other 6 times will use meta in cache. + // 2 meta files need load, other 6 times will use meta in cache. assert(statsAfterQuery1.missCount() - statsBeforeQuery.missCount() == 2) assert(statsAfterQuery1.hitCount() - statsBeforeQuery.hitCount() == 6) checkAnswer(sql("SELECT id FROM parquet_use_meta_cache where id < 5"), From 7ff0502fb785f8c2f1297b83d9bd17d4b6ad5138 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 25 Nov 2020 21:06:05 +0800 Subject: [PATCH 11/30] add a test case --- .../sql/execution/datasources/parquet/ParquetQuerySuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 1d874e8338c44..92eb535dbcfb3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -842,8 +842,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } test("SPARK-33449: simple select queries with file meta cache") { - withSQLConf(SQLConf.PARQUET_META_CACHE_ENABLED.key -> "true", - SQLConf.PARQUET_META_CACHE_TTL_SINCE_LAST_ACCESS.key -> "5") { + withSQLConf(SQLConf.PARQUET_META_CACHE_ENABLED.key -> "true") { val tableName = "parquet_use_meta_cache" withTable(tableName) { (0 until 10).map(i => (i, i.toString)).toDF("id", "value") From 44ca052218dfac88f28f5b0e6cdc0eaf613d3bf7 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 25 Nov 2020 21:06:31 +0800 Subject: [PATCH 12/30] add a test case --- .../sql/execution/datasources/parquet/ParquetQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 92eb535dbcfb3..828ce8354185f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -856,7 +856,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS // push down filter and 2 times related to file read. The 1st query // run twice: df.collect() and df.rdd.count(), so it triggers 8 times // file meta read in total. missCount is 2 because cache is empty and - // 2 meta files need load, other 6 times will use meta in cache. + // 2 meta files need load, other 6 times will read meta from cache. assert(statsAfterQuery1.missCount() - statsBeforeQuery.missCount() == 2) assert(statsAfterQuery1.hitCount() - statsBeforeQuery.hitCount() == 6) checkAnswer(sql("SELECT id FROM parquet_use_meta_cache where id < 5"), From 7254d884c1a387627b98b8761c867127a726e422 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 25 Nov 2020 21:28:35 +0800 Subject: [PATCH 13/30] use table name --- .../sql/execution/datasources/parquet/ParquetQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 828ce8354185f..05b66caa778a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -849,7 +849,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS .write.saveAsTable(tableName) try { val statsBeforeQuery = FileMetaCacheManager.cacheStats - checkAnswer(sql("SELECT id FROM parquet_use_meta_cache where id > 5"), + checkAnswer(sql(s"SELECT id FROM $tableName where id > 5"), (6 until 10).map(Row.apply(_))) val statsAfterQuery1 = FileMetaCacheManager.cacheStats // The 1st query triggers 4 times file meta read: 2 times related to @@ -859,7 +859,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS // 2 meta files need load, other 6 times will read meta from cache. assert(statsAfterQuery1.missCount() - statsBeforeQuery.missCount() == 2) assert(statsAfterQuery1.hitCount() - statsBeforeQuery.hitCount() == 6) - checkAnswer(sql("SELECT id FROM parquet_use_meta_cache where id < 5"), + checkAnswer(sql(s"SELECT id FROM $tableName where id < 5"), (0 until 5).map(Row.apply(_))) val statsAfterQuery2 = FileMetaCacheManager.cacheStats // The 2nd query also triggers 8 times file meta read in total and From bc25c4e414c4da2058ef8ae6e24769fb66fcd784 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 25 Nov 2020 22:42:45 +0800 Subject: [PATCH 14/30] Add meta cache support for orc --- .../apache/spark/sql/internal/SQLConf.scala | 23 ++++++--- .../orc/OrcColumnarBatchReader.java | 10 +++- .../datasources/FileMetaCacheManager.scala | 2 +- .../datasources/orc/OrcFileFormat.scala | 16 ++++++- .../datasources/orc/OrcFileMeta.scala | 48 +++++++++++++++++++ .../v2/orc/OrcPartitionReaderFactory.scala | 26 ++++++++-- .../datasources/orc/OrcQuerySuite.scala | 35 +++++++++++++- 7 files changed, 145 insertions(+), 15 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 176e35c3953b2..04d55d30ebdb4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -772,13 +772,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val PARQUET_META_CACHE_TTL_SINCE_LAST_ACCESS = - buildConf("spark.sql.parquet.metadataCache.ttl.sinceLastAccess") - .version("3.1.0") - .doc("Time-to-live for parquet metadata cache entry after last access, the unit is seconds.") - .timeConf(TimeUnit.SECONDS) - .createWithDefault(1000L) - val PARQUET_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.parquet.columnarReaderBatchSize") .doc("The number of rows to include in a parquet vectorized reader batch. The number should " + "be carefully chosen to minimize overhead and avoid OOMs in reading data.") @@ -832,6 +825,20 @@ object SQLConf { .booleanConf .createWithDefault(false) + val ORC_META_CACHE_ENABLED = buildConf("spark.sql.orc.metadataCache.enabled") + .doc("To indicate if enable orc file meta cache, it is recommended to enabled " + + "this config in long-running process mode, such as Thrift Server, default is false") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + val FILE_META_CACHE_TTL_SINCE_LAST_ACCESS = + buildConf("spark.sql.metadataCache.ttl.sinceLastAccess") + .version("3.1.0") + .doc("Time-to-live for file metadata cache entry after last access, the unit is seconds.") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(1000L) + val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") .doc("When true, check all the partition paths under the table\'s root directory " + "when reading data stored in HDFS. This configuration will be deprecated in the future " + @@ -3130,6 +3137,8 @@ class SQLConf extends Serializable with Logging { def orcVectorizedReaderBatchSize: Int = getConf(ORC_VECTORIZED_READER_BATCH_SIZE) + def orcMetaCacheEnabled: Boolean = getConf(ORC_META_CACHE_ENABLED) + def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java index 6a4b116cdef0b..e077e1d23f59d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java @@ -29,6 +29,7 @@ import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.OrcTail; import org.apache.orc.mapred.OrcInputFormat; import org.apache.spark.sql.catalyst.InternalRow; @@ -74,6 +75,8 @@ public class OrcColumnarBatchReader extends RecordReader { // The wrapped ORC column vectors. private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers; + private OrcTail cachedTail; + public OrcColumnarBatchReader(int capacity) { this.capacity = capacity; } @@ -124,7 +127,8 @@ public void initialize( fileSplit.getPath(), OrcFile.readerOptions(conf) .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) - .filesystem(fileSplit.getPath().getFileSystem(conf))); + .filesystem(fileSplit.getPath().getFileSystem(conf)) + .orcTail(cachedTail)); Reader.Options options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart(), fileSplit.getLength()); recordReader = reader.rows(options); @@ -188,6 +192,10 @@ public void initBatch( columnarBatch = new ColumnarBatch(orcVectorWrappers); } + public void setCachedTail(OrcTail cachedTail) { + this.cachedTail = cachedTail; + } + /** * Return true if there exists more data in the next batch. If exists, prepare the next batch * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala index 1c4f3b9ba7db9..f3e4d7e019a2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala @@ -44,7 +44,7 @@ private[sql] object FileMetaCacheManager extends Logging { } private lazy val ttlTime = - SparkEnv.get.conf.get(SQLConf.PARQUET_META_CACHE_TTL_SINCE_LAST_ACCESS) + SparkEnv.get.conf.get(SQLConf.FILE_META_CACHE_TTL_SINCE_LAST_ACCESS) private lazy val cache = CacheBuilder diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 2671682e18f31..3ca24cf17e34b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -169,11 +170,19 @@ class OrcFileFormat (file: PartitionedFile) => { val conf = broadcastedConf.value.value + val metaCacheEnabled = + conf.getBoolean(SQLConf.ORC_META_CACHE_ENABLED.key, false) val filePath = new Path(new URI(file.filePath)) val fs = filePath.getFileSystem(conf) - val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) + val readerOptions = if (metaCacheEnabled) { + val tail = FileMetaCacheManager.get(OrcFileMetaKey(filePath, conf)) + .asInstanceOf[OrcFileMeta].tail + OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) + } else { + OrcFile.readerOptions(conf).filesystem(fs) + } val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( @@ -213,6 +222,11 @@ class OrcFileFormat val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) val requestedPartitionColIds = Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) + if (metaCacheEnabled) { + val tail = FileMetaCacheManager.get(OrcFileMetaKey(filePath, conf)) + .asInstanceOf[OrcFileMeta].tail + batchReader.setCachedTail(tail) + } batchReader.initialize(fileSplit, taskAttemptContext) batchReader.initBatch( TypeDescription.fromString(resultSchemaString), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala new file mode 100644 index 0000000000000..3c8c0344423f5 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.orc.OrcFile +import org.apache.orc.impl.{OrcTail, ReaderImpl} + +import org.apache.spark.sql.execution.datasources.{FileMeta, FileMetaKey} +import org.apache.spark.util.Utils + +private[sql] case class OrcFileMetaKey(path: Path, configuration: Configuration) + extends FileMetaKey { + override def getFileMeta: OrcFileMeta = OrcFileMeta(path, configuration) +} + +case class OrcFileMeta(tail: OrcTail) extends FileMeta + +object OrcFileMeta { + def apply(path: Path, conf: Configuration): OrcFileMeta = { + val fs = path.getFileSystem(conf) + val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) + Utils.tryWithResource(new ForTailCacheReader(path, readerOptions)) { fileReader => + new OrcFileMeta(fileReader.getOrcTail) + } + } +} + +class ForTailCacheReader(path: Path, options: OrcFile.ReaderOptions) + extends ReaderImpl(path, options) { + def getOrcTail: OrcTail = tail +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index b0ddee0a6b336..a5f0ab3833f9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -30,8 +30,8 @@ import org.apache.orc.mapreduce.OrcInputFormat import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} -import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFilters, OrcUtils} +import org.apache.spark.sql.execution.datasources.{FileMetaCacheManager, PartitionedFile} +import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFileMeta, OrcFileMetaKey, OrcFilters, OrcUtils} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter @@ -60,6 +60,7 @@ case class OrcPartitionReaderFactory( private val capacity = sqlConf.orcVectorizedReaderBatchSize private val orcFilterPushDown = sqlConf.orcFilterPushDown private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles + private val orcMetaCacheEnabled = sqlConf.orcMetaCacheEnabled override def supportColumnarReads(partition: InputPartition): Boolean = { sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled && @@ -87,7 +88,13 @@ case class OrcPartitionReaderFactory( pushDownPredicates(filePath, conf) val fs = filePath.getFileSystem(conf) - val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) + val readerOptions = if (orcMetaCacheEnabled) { + val tail = FileMetaCacheManager.get(OrcFileMetaKey(filePath, conf)) + .asInstanceOf[OrcFileMeta].tail + OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) + } else { + OrcFile.readerOptions(conf).filesystem(fs) + } val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( @@ -135,7 +142,13 @@ case class OrcPartitionReaderFactory( pushDownPredicates(filePath, conf) val fs = filePath.getFileSystem(conf) - val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) + val readerOptions = if (orcMetaCacheEnabled) { + val tail = FileMetaCacheManager.get(OrcFileMetaKey(filePath, conf)) + .asInstanceOf[OrcFileMeta].tail + OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) + } else { + OrcFile.readerOptions(conf).filesystem(fs) + } val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( @@ -158,6 +171,11 @@ case class OrcPartitionReaderFactory( val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId) val batchReader = new OrcColumnarBatchReader(capacity) + if (orcMetaCacheEnabled) { + val tail = FileMetaCacheManager.get(OrcFileMetaKey(filePath, conf)) + .asInstanceOf[OrcFileMeta].tail + batchReader.setCachedTail(tail) + } batchReader.initialize(fileSplit, taskAttemptContext) val requestedPartitionColIds = Array.fill(readDataSchema.length)(-1) ++ Range(0, partitionSchema.length) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index ead2c2cf1b70f..6653238600c6a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -34,7 +34,7 @@ import org.apache.orc.mapreduce.OrcInputFormat import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.{FileMetaCacheManager, HadoopFsRelation, LogicalRelation, RecordReaderIterator} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -713,6 +713,39 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { } } } + + test("SPARK-33449: simple select queries with file meta cache") { + withSQLConf(SQLConf.ORC_META_CACHE_ENABLED.key -> "true") { + val tableName = "orc_use_meta_cache" + withTable(tableName) { + (0 until 10).map(i => (i, i.toString)).toDF("id", "value") + .write.format("orc").saveAsTable(tableName) + try { + val statsBeforeQuery = FileMetaCacheManager.cacheStats + checkAnswer(sql(s"SELECT id FROM $tableName where id > 5"), + (6 until 10).map(Row.apply(_))) + val statsAfterQuery1 = FileMetaCacheManager.cacheStats + // The 1st query triggers 4 times file meta read: 2 times related to + // push down filter and 2 times related to file read. The 1st query + // run twice: df.collect() and df.rdd.count(), so it triggers 8 times + // file meta read in total. missCount is 2 because cache is empty and + // 2 meta files need load, other 6 times will read meta from cache. + assert(statsAfterQuery1.missCount() - statsBeforeQuery.missCount() == 2) + assert(statsAfterQuery1.hitCount() - statsBeforeQuery.hitCount() == 6) + checkAnswer(sql(s"SELECT id FROM $tableName where id < 5"), + (0 until 5).map(Row.apply(_))) + val statsAfterQuery2 = FileMetaCacheManager.cacheStats + // The 2nd query also triggers 8 times file meta read in total and + // all read from meta cache, so missCount no growth and hitCount + // increase 8 times. + assert(statsAfterQuery2.missCount() - statsAfterQuery1.missCount() == 0) + assert(statsAfterQuery2.hitCount() - statsAfterQuery1.hitCount() == 8) + } finally { + FileMetaCacheManager.cleanUp() + } + } + } + } } class OrcV1QuerySuite extends OrcQuerySuite { From 6079adc3db14294b6bd0014d0c17bf3a3ca6aac1 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 22 Dec 2020 00:56:24 +0800 Subject: [PATCH 15/30] update conf version --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 04d55d30ebdb4..7f16f4f82a078 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -768,7 +768,7 @@ object SQLConf { val PARQUET_META_CACHE_ENABLED = buildConf("spark.sql.parquet.metadataCache.enabled") .doc("To indicate if enable parquet file meta cache, it is recommended to enabled " + "this config in long-running process mode, such as Thrift Server, default is false") - .version("3.1.0") + .version("3.2.0") .booleanConf .createWithDefault(false) @@ -828,13 +828,13 @@ object SQLConf { val ORC_META_CACHE_ENABLED = buildConf("spark.sql.orc.metadataCache.enabled") .doc("To indicate if enable orc file meta cache, it is recommended to enabled " + "this config in long-running process mode, such as Thrift Server, default is false") - .version("3.1.0") + .version("3.2.0") .booleanConf .createWithDefault(false) val FILE_META_CACHE_TTL_SINCE_LAST_ACCESS = buildConf("spark.sql.metadataCache.ttl.sinceLastAccess") - .version("3.1.0") + .version("3.2.0") .doc("Time-to-live for file metadata cache entry after last access, the unit is seconds.") .timeConf(TimeUnit.SECONDS) .createWithDefault(1000L) From 3f39531af78736f161a2b4689da7b498c908a5c1 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 20:03:37 +0800 Subject: [PATCH 16/30] config namespace and comments --- .../apache/spark/sql/internal/SQLConf.scala | 30 +++++++++---------- .../datasources/orc/OrcFileFormat.scala | 2 +- .../parquet/ParquetFileFormat.scala | 6 ++-- .../v2/orc/OrcPartitionReaderFactory.scala | 2 +- .../ParquetPartitionReaderFactory.scala | 3 +- .../datasources/orc/OrcQuerySuite.scala | 2 +- .../parquet/ParquetQuerySuite.scala | 2 +- 7 files changed, 23 insertions(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 333b09d37a6a2..db2211494111d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -785,13 +785,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val PARQUET_META_CACHE_ENABLED = buildConf("spark.sql.parquet.metadataCache.enabled") - .doc("To indicate if enable parquet file meta cache, it is recommended to enabled " + - "this config in long-running process mode, such as Thrift Server, default is false") - .version("3.2.0") - .booleanConf - .createWithDefault(false) - val PARQUET_VECTORIZED_READER_BATCH_SIZE = buildConf("spark.sql.parquet.columnarReaderBatchSize") .doc("The number of rows to include in a parquet vectorized reader batch. The number should " + "be carefully chosen to minimize overhead and avoid OOMs in reading data.") @@ -845,19 +838,26 @@ object SQLConf { .booleanConf .createWithDefault(false) - val ORC_META_CACHE_ENABLED = buildConf("spark.sql.orc.metadataCache.enabled") + val FILE_META_CACHE_PARQUET_ENABLED = buildConf("spark.sql.fileMetaCache.parquet.enabled") + .doc("To indicate if enable parquet file meta cache, it is recommended to enabled " + + "this config when multiple queries are performed on the same dataset, default is false.") + .version("3.2.0") + .booleanConf + .createWithDefault(false) + + val FILE_META_CACHE_ORC_ENABLED = buildConf("spark.sql.fileMetaCache.orc.enabled") .doc("To indicate if enable orc file meta cache, it is recommended to enabled " + - "this config in long-running process mode, such as Thrift Server, default is false") + "this config when multiple queries are performed on the same dataset, default is false.") .version("3.2.0") .booleanConf .createWithDefault(false) val FILE_META_CACHE_TTL_SINCE_LAST_ACCESS = - buildConf("spark.sql.metadataCache.ttl.sinceLastAccess") + buildConf("spark.sql.fileMetaCache.ttl.sinceLastAccess") .version("3.2.0") .doc("Time-to-live for file metadata cache entry after last access, the unit is seconds.") .timeConf(TimeUnit.SECONDS) - .createWithDefault(1000L) + .createWithDefault(3600L) val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") .doc("When true, check all the partition paths under the table\'s root directory " + @@ -3272,14 +3272,16 @@ class SQLConf extends Serializable with Logging { def orcVectorizedReaderBatchSize: Int = getConf(ORC_VECTORIZED_READER_BATCH_SIZE) - def orcMetaCacheEnabled: Boolean = getConf(ORC_META_CACHE_ENABLED) - def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED) def parquetVectorizedReaderBatchSize: Int = getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE) + def fileMetaCacheParquetEnabled: Boolean = getConf(FILE_META_CACHE_PARQUET_ENABLED) + + def fileMetaCacheOrcEnabled: Boolean = getConf(FILE_META_CACHE_ORC_ENABLED) + def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) @@ -3463,8 +3465,6 @@ class SQLConf extends Serializable with Logging { def parquetRecordFilterEnabled: Boolean = getConf(PARQUET_RECORD_FILTER_ENABLED) - def parquetMetaCacheEnabled: Boolean = getConf(PARQUET_META_CACHE_ENABLED) - def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING) def inMemoryTableScanStatisticsEnabled: Boolean = getConf(IN_MEMORY_TABLE_SCAN_STATISTICS_ENABLED) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index a27a4cbcb44bd..4d5021cbd3b89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -171,7 +171,7 @@ class OrcFileFormat (file: PartitionedFile) => { val conf = broadcastedConf.value.value val metaCacheEnabled = - conf.getBoolean(SQLConf.ORC_META_CACHE_ENABLED.key, false) + conf.getBoolean(SQLConf.FILE_META_CACHE_ORC_ENABLED.key, false) val filePath = new Path(new URI(file.filePath)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 4784558be1368..e072453e13b73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -228,8 +228,8 @@ class ParquetFileFormat sparkSession.sessionState.conf.isParquetINT96AsTimestamp) hadoopConf.setBoolean( - SQLConf.PARQUET_META_CACHE_ENABLED.key, - sparkSession.sessionState.conf.parquetMetaCacheEnabled) + SQLConf.FILE_META_CACHE_PARQUET_ENABLED.key, + sparkSession.sessionState.conf.fileMetaCacheParquetEnabled) val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -271,7 +271,7 @@ class ParquetFileFormat val sharedConf = broadcastedHadoopConf.value.value val metaCacheEnabled = - sharedConf.getBoolean(SQLConf.PARQUET_META_CACHE_ENABLED.key, false) + sharedConf.getBoolean(SQLConf.FILE_META_CACHE_PARQUET_ENABLED.key, false) lazy val footerFileMetaData = if (metaCacheEnabled) { FileMetaCacheManager.get(ParquetFileMetaKey(filePath, sharedConf)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 217780b15bd44..346810912b006 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -60,7 +60,7 @@ case class OrcPartitionReaderFactory( private val capacity = sqlConf.orcVectorizedReaderBatchSize private val orcFilterPushDown = sqlConf.orcFilterPushDown private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles - private val orcMetaCacheEnabled = sqlConf.orcMetaCacheEnabled + private val orcMetaCacheEnabled = sqlConf.fileMetaCacheOrcEnabled override def supportColumnarReads(partition: InputPartition): Boolean = { sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 99bfa2d652d8e..9ce33c7629235 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -74,7 +74,7 @@ case class ParquetPartitionReaderFactory( private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - private val parquetMetaCacheEnabled = sqlConf.parquetMetaCacheEnabled + private val parquetMetaCacheEnabled = sqlConf.fileMetaCacheParquetEnabled override def supportColumnarReads(partition: InputPartition): Boolean = { sqlConf.parquetVectorizedReaderEnabled && sqlConf.wholeStageEnabled && @@ -230,7 +230,6 @@ case class ParquetPartitionReaderFactory( private def createVectorizedReader(file: PartitionedFile): VectorizedParquetRecordReader = { val vectorizedReader = buildReaderBase(file, createParquetVectorizedReader) .asInstanceOf[VectorizedParquetRecordReader] - vectorizedReader.initBatch(partitionSchema, file.partitionValues) vectorizedReader } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 6653238600c6a..960db5a0969b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -715,7 +715,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { } test("SPARK-33449: simple select queries with file meta cache") { - withSQLConf(SQLConf.ORC_META_CACHE_ENABLED.key -> "true") { + withSQLConf(SQLConf.FILE_META_CACHE_ORC_ENABLED.key -> "true") { val tableName = "orc_use_meta_cache" withTable(tableName) { (0 until 10).map(i => (i, i.toString)).toDF("id", "value") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index f376a85a20950..1fa3f89a38d86 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -842,7 +842,7 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS } test("SPARK-33449: simple select queries with file meta cache") { - withSQLConf(SQLConf.PARQUET_META_CACHE_ENABLED.key -> "true") { + withSQLConf(SQLConf.FILE_META_CACHE_PARQUET_ENABLED.key -> "true") { val tableName = "parquet_use_meta_cache" withTable(tableName) { (0 until 10).map(i => (i, i.toString)).toDF("id", "value") From 99f18f500cd46c62435561854817f51d5ea76151 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 20:05:29 +0800 Subject: [PATCH 17/30] remove guava import --- .../datasources/parquet/SpecificParquetRecordReaderBase.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 38785e77c19ca..21d34ccf0e2f1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.Set; -import com.google.common.collect.Lists; import scala.Option; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; @@ -261,7 +260,7 @@ public void setCachedFooter(ParquetMetadata cachedFooter) { private ParquetMetadata getFooterByRange(Configuration configuration, long start, long end) throws IOException { if (cachedFooter != null) { - List filteredBlocks = Lists.newArrayList(); + List filteredBlocks = new ArrayList<>(); List blocks = cachedFooter.getBlocks(); for (BlockMetaData block : blocks) { long offset = block.getStartingPos(); From 636058073147854288acd2b3f74a1e4323020bd4 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 20:08:07 +0800 Subject: [PATCH 18/30] add for testing comments --- .../datasources/FileMetaCacheManager.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala index f3e4d7e019a2d..bf726742f06b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.util.concurrent.TimeUnit -import com.google.common.cache.{CacheBuilder, CacheLoader, CacheStats, RemovalListener, RemovalNotification} +import com.google.common.cache.{CacheBuilder, CacheLoader, CacheStats} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -29,15 +29,8 @@ import org.apache.spark.sql.internal.SQLConf private[sql] object FileMetaCacheManager extends Logging { - private lazy val removalListener = new RemovalListener[FileMetaKey, FileMeta]() { - override def onRemoval(n: RemovalNotification[FileMetaKey, FileMeta]): Unit = { - logDebug(s"Evicting Data File Meta ${n.getKey.path}") - } - } - private lazy val cacheLoader = new CacheLoader[FileMetaKey, FileMeta]() { - override def load(entry: FileMetaKey) - : FileMeta = { + override def load(entry: FileMetaKey): FileMeta = { logDebug(s"Loading Data File Meta ${entry.path}") entry.getFileMeta } @@ -51,13 +44,18 @@ private[sql] object FileMetaCacheManager extends Logging { .newBuilder() .expireAfterAccess(ttlTime, TimeUnit.SECONDS) .recordStats() - .removalListener(removalListener) .build[FileMetaKey, FileMeta](cacheLoader) def get(dataFile: FileMetaKey): FileMeta = cache.get(dataFile) + /** + * This is visible for testing. + */ def cacheStats: CacheStats = cache.stats() + /** + * This is visible for testing. + */ def cleanUp(): Unit = cache.cleanUp() } From 0a224ffd5c720165241a616cc06a2509b6fd8fea Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 20:10:08 +0800 Subject: [PATCH 19/30] rename config --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index db2211494111d..40888bcda0034 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -853,7 +853,7 @@ object SQLConf { .createWithDefault(false) val FILE_META_CACHE_TTL_SINCE_LAST_ACCESS = - buildConf("spark.sql.fileMetaCache.ttl.sinceLastAccess") + buildConf("spark.sql.fileMetaCache.ttlSinceLastAccess") .version("3.2.0") .doc("Time-to-live for file metadata cache entry after last access, the unit is seconds.") .timeConf(TimeUnit.SECONDS) From 98ef2de4ad1f16d9d9637643660f207a16bd2ca6 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 25 Jan 2021 20:45:06 +0800 Subject: [PATCH 20/30] add comments --- .../datasources/FileMetaCacheManager.scala | 25 +++++++++++++------ .../datasources/orc/OrcFileMeta.scala | 6 ++--- .../datasources/parquet/ParquetFileMeta.scala | 4 +-- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala index bf726742f06b9..c94c58cd14c31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala @@ -27,6 +27,16 @@ import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf +/** + * A singleton Cache Manager to caching file meta. We cache these file metas in order to speed up + * iterated queries over the same dataset. Otherwise, each query would have to hit remote storage + * in order to fetch file meta before read files. + * + * We should implement the corresponding `FileMetaKey` for a specific file format, for example + * `ParquetFileMetaKey` or `OrcFileMetaKey`. By default, the file path is used as the identification + * of the `FileMetaKey` and the `getFileMeta` method of `FileMetaKey` is used to return the file + * meta of the corresponding file format. + */ private[sql] object FileMetaCacheManager extends Logging { private lazy val cacheLoader = new CacheLoader[FileMetaKey, FileMeta]() { @@ -39,12 +49,11 @@ private[sql] object FileMetaCacheManager extends Logging { private lazy val ttlTime = SparkEnv.get.conf.get(SQLConf.FILE_META_CACHE_TTL_SINCE_LAST_ACCESS) - private lazy val cache = - CacheBuilder - .newBuilder() - .expireAfterAccess(ttlTime, TimeUnit.SECONDS) - .recordStats() - .build[FileMetaKey, FileMeta](cacheLoader) + private lazy val cache = CacheBuilder + .newBuilder() + .expireAfterAccess(ttlTime, TimeUnit.SECONDS) + .recordStats() + .build[FileMetaKey, FileMeta](cacheLoader) def get(dataFile: FileMetaKey): FileMeta = cache.get(dataFile) @@ -59,7 +68,7 @@ private[sql] object FileMetaCacheManager extends Logging { def cleanUp(): Unit = cache.cleanUp() } -abstract class FileMetaKey { +private[sql] abstract class FileMetaKey { def path: Path def configuration: Configuration def getFileMeta: FileMeta @@ -70,4 +79,4 @@ abstract class FileMetaKey { } } -trait FileMeta +private[sql] trait FileMeta diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala index 3c8c0344423f5..bee6a9c55cac0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala @@ -30,9 +30,9 @@ private[sql] case class OrcFileMetaKey(path: Path, configuration: Configuration) override def getFileMeta: OrcFileMeta = OrcFileMeta(path, configuration) } -case class OrcFileMeta(tail: OrcTail) extends FileMeta +private[sql] case class OrcFileMeta(tail: OrcTail) extends FileMeta -object OrcFileMeta { +private[sql] object OrcFileMeta { def apply(path: Path, conf: Configuration): OrcFileMeta = { val fs = path.getFileSystem(conf) val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) @@ -42,7 +42,7 @@ object OrcFileMeta { } } -class ForTailCacheReader(path: Path, options: OrcFile.ReaderOptions) +private[sql] class ForTailCacheReader(path: Path, options: OrcFile.ReaderOptions) extends ReaderImpl(path, options) { def getOrcTail: OrcTail = tail } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala index 96eb53a2e2bc8..b5b0a68a83ae6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala @@ -30,9 +30,9 @@ private[sql] case class ParquetFileMetaKey(path: Path, configuration: Configurat override def getFileMeta: ParquetFileMeta = ParquetFileMeta(path, configuration) } -class ParquetFileMeta(val footer: ParquetMetadata) extends FileMeta +private[sql] class ParquetFileMeta(val footer: ParquetMetadata) extends FileMeta -object ParquetFileMeta { +private[sql] object ParquetFileMeta { def apply(path: Path, conf: Configuration): ParquetFileMeta = { new ParquetFileMeta(ParquetFileReader.readFooter(conf, path, NO_FILTER)) } From 61175ed37aaface8b193d94fcc5f9bdae88324aa Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 9 Aug 2021 15:20:41 +0800 Subject: [PATCH 21/30] fix compile --- .../SpecificParquetRecordReaderBase.java | 46 +++++++++++++++---- .../parquet/ParquetFileFormat.scala | 19 +++++++- .../ParquetPartitionReaderFactory.scala | 16 ++++++- .../datasources/orc/OrcQuerySuite.scala | 39 ++++++++++++++-- .../parquet/ParquetQuerySuite.scala | 10 ++-- 5 files changed, 111 insertions(+), 19 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 6264d6341c65a..51d54ba0ed4cc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -39,11 +39,16 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.RowGroupFilter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.BadConfigurationException; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.schema.MessageType; @@ -77,6 +82,8 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader fileMetadata = reader.getFileMetaData().getKeyValueMetaData(); + ParquetMetadata footer = + readFooterByRange(configuration, split.getStart(), split.getStart() + split.getLength()); + this.fileSchema = footer.getFileMetaData().getSchema(); + FilterCompat.Filter filter = ParquetInputFormat.getFilter(configuration); + List blocks = RowGroupFilter.filterRowGroups(filter, footer.getBlocks(), fileSchema); + + Map fileMetadata = footer.getFileMetaData().getKeyValueMetaData(); ReadSupport readSupport = getReadSupportInstance(getReadSupportClass(configuration)); ReadSupport.ReadContext readContext = readSupport.init(new InitContext( taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema)); this.requestedSchema = readContext.getRequestedSchema(); - reader.setRequestedSchema(requestedSchema); String sparkRequestedSchemaString = configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA()); this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString); + this.reader = new ParquetFileReader( + configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); this.totalRowCount = reader.getFilteredRecordCount(); // For test purpose. @@ -116,6 +124,28 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont } } + public void setCachedFooter(ParquetMetadata cachedFooter) { + this.cachedFooter = cachedFooter; + } + + private ParquetMetadata readFooterByRange(Configuration configuration, + long start, long end) throws IOException { + if (cachedFooter != null) { + List filteredBlocks = new ArrayList<>(); + List blocks = cachedFooter.getBlocks(); + for (BlockMetaData block : blocks) { + long offset = block.getStartingPos(); + if (offset >= start && offset < end) { + filteredBlocks.add(block); + } + } + return new ParquetMetadata(cachedFooter.getFileMetaData(), filteredBlocks); + } else { + return ParquetFileReader + .readFooter(configuration, file, ParquetMetadataConverter.range(start, end)); + } + } + /** * Returns the list of files at 'path' recursively. This skips files that are ignored normally * by MapReduce. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index e2fe5b4486213..95b205162200e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -228,6 +228,10 @@ class ParquetFileFormat SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sparkSession.sessionState.conf.isParquetINT96AsTimestamp) + hadoopConf.setBoolean( + SQLConf.FILE_META_CACHE_PARQUET_ENABLED.key, + sparkSession.sessionState.conf.fileMetaCacheParquetEnabled) + val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) @@ -263,9 +267,15 @@ class ParquetFileFormat val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) val sharedConf = broadcastedHadoopConf.value.value + val metaCacheEnabled = + sharedConf.getBoolean(SQLConf.FILE_META_CACHE_PARQUET_ENABLED.key, false) - lazy val footerFileMetaData = + lazy val footerFileMetaData = if (metaCacheEnabled) { + FileMetaCacheManager.get(ParquetFileMetaKey(filePath, sharedConf)) + .asInstanceOf[ParquetFileMeta].footer.getFileMetaData + } else { ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData + } val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) @@ -327,6 +337,13 @@ class ParquetFileFormat int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) + // Set footer before initialize. + if (metaCacheEnabled) { + val fileMeta = FileMetaCacheManager + .get(ParquetFileMetaKey(filePath, sharedConf)) + .asInstanceOf[ParquetFileMeta] + vectorizedReader.setCachedFooter(fileMeta.footer) + } val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion listener before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 058669b0937fa..60bbca9f2bcee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} -import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileMetaCacheManager, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf @@ -77,6 +77,7 @@ case class ParquetPartitionReaderFactory( private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + private val parquetMetaCacheEnabled = sqlConf.fileMetaCacheParquetEnabled private val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead private val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead @@ -131,8 +132,12 @@ case class ParquetPartitionReaderFactory( val filePath = new Path(new URI(file.filePath)) val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - lazy val footerFileMetaData = + lazy val footerFileMetaData = if (parquetMetaCacheEnabled) { + FileMetaCacheManager.get(ParquetFileMetaKey(filePath, conf)) + .asInstanceOf[ParquetFileMeta].footer.getFileMetaData + } else { ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData + } val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode( footerFileMetaData.getKeyValueMetaData.get, datetimeRebaseModeInRead) @@ -249,6 +254,13 @@ case class ParquetPartitionReaderFactory( int96RebaseMode.toString, enableOffHeapColumnVector && taskContext.isDefined, capacity) + // Set footer before initialize. + if (parquetMetaCacheEnabled) { + val fileMeta = FileMetaCacheManager + .get(ParquetFileMetaKey(split.getPath, hadoopAttemptContext.getConfiguration)) + .asInstanceOf[ParquetFileMeta] + vectorizedReader.setCachedFooter(fileMeta.footer) + } val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion listener before `initialization`. taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close())) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 680c2cf2b42e4..38feb519e0a77 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -35,8 +35,8 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.{FileMetaCacheManager, HadoopFsRelation, LogicalRelation, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -734,7 +734,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { val readDf = spark.read.orc(path) val vectorizationEnabled = readDf.queryExecution.executedPlan.find { - case scan @ (_: FileSourceScanExec | _: BatchScanExec) => scan.supportsColumnar + case scan @ (_: FileSourceScanExec | _: MicroBatchScanExec) => scan.supportsColumnar case _ => false }.isDefined assert(vectorizationEnabled) @@ -742,6 +742,39 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { } } } + + test("SPARK-33449: simple select queries with file meta cache") { + withSQLConf(SQLConf.FILE_META_CACHE_ORC_ENABLED.key -> "true") { + val tableName = "orc_use_meta_cache" + withTable(tableName) { + (0 until 10).map(i => (i, i.toString)).toDF("id", "value") + .write.format("orc").saveAsTable(tableName) + try { + val statsBeforeQuery = FileMetaCacheManager.cacheStats + checkAnswer(sql(s"SELECT id FROM $tableName where id > 5"), + (6 until 10).map(Row.apply(_))) + val statsAfterQuery1 = FileMetaCacheManager.cacheStats + // The 1st query triggers 4 times file meta read: 2 times related to + // push down filter and 2 times related to file read. The 1st query + // run twice: df.collect() and df.rdd.count(), so it triggers 8 times + // file meta read in total. missCount is 2 because cache is empty and + // 2 meta files need load, other 6 times will read meta from cache. + assert(statsAfterQuery1.missCount() - statsBeforeQuery.missCount() == 2) + assert(statsAfterQuery1.hitCount() - statsBeforeQuery.hitCount() == 6) + checkAnswer(sql(s"SELECT id FROM $tableName where id < 5"), + (0 until 5).map(Row.apply(_))) + val statsAfterQuery2 = FileMetaCacheManager.cacheStats + // The 2nd query also triggers 8 times file meta read in total and + // all read from meta cache, so missCount no growth and hitCount + // increase 8 times. + assert(statsAfterQuery2.missCount() - statsAfterQuery1.missCount() == 0) + assert(statsAfterQuery2.hitCount() - statsAfterQuery1.hitCount() == 8) + } finally { + FileMetaCacheManager.cleanUp() + } + } + } + } } class OrcV1QuerySuite extends OrcQuerySuite { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 9ef43995467c6..4814c8da67081 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{SchemaColumnConvertNotSupportedException, SQLHadoopMapReduceCommitProtocol} import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -953,8 +953,8 @@ class ParquetV2QuerySuite extends ParquetQuerySuite { // do not return batch - whole stage codegen is disabled for wide table (>200 columns) val df2 = spark.read.parquet(path) - val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get - val parquetScan2 = fileScan2.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan] + val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[MicroBatchScanExec]).get + val parquetScan2 = fileScan2.asInstanceOf[MicroBatchScanExec].scan.asInstanceOf[ParquetScan] // The method `supportColumnarReads` in Parquet doesn't depends on the input partition. // Here we can pass null input partition to the method for testing propose. assert(!parquetScan2.createReaderFactory().supportColumnarReads(null)) @@ -963,8 +963,8 @@ class ParquetV2QuerySuite extends ParquetQuerySuite { // return batch val columns = Seq.tabulate(9) {i => s"c$i"} val df3 = df2.selectExpr(columns : _*) - val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get - val parquetScan3 = fileScan3.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan] + val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[MicroBatchScanExec]).get + val parquetScan3 = fileScan3.asInstanceOf[MicroBatchScanExec].scan.asInstanceOf[ParquetScan] assert(parquetScan3.createReaderFactory().supportColumnarReads(null)) checkAnswer(df3, df.selectExpr(columns : _*)) } From 0d4f3e8fde2efcdc26417ed6b9f63de287258f93 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 9 Aug 2021 15:22:12 +0800 Subject: [PATCH 22/30] fix compile --- .../datasources/parquet/ParquetQuerySuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 4814c8da67081..9ef43995467c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{SchemaColumnConvertNotSupportedException, SQLHadoopMapReduceCommitProtocol} import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} -import org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -953,8 +953,8 @@ class ParquetV2QuerySuite extends ParquetQuerySuite { // do not return batch - whole stage codegen is disabled for wide table (>200 columns) val df2 = spark.read.parquet(path) - val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[MicroBatchScanExec]).get - val parquetScan2 = fileScan2.asInstanceOf[MicroBatchScanExec].scan.asInstanceOf[ParquetScan] + val fileScan2 = df2.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get + val parquetScan2 = fileScan2.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan] // The method `supportColumnarReads` in Parquet doesn't depends on the input partition. // Here we can pass null input partition to the method for testing propose. assert(!parquetScan2.createReaderFactory().supportColumnarReads(null)) @@ -963,8 +963,8 @@ class ParquetV2QuerySuite extends ParquetQuerySuite { // return batch val columns = Seq.tabulate(9) {i => s"c$i"} val df3 = df2.selectExpr(columns : _*) - val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[MicroBatchScanExec]).get - val parquetScan3 = fileScan3.asInstanceOf[MicroBatchScanExec].scan.asInstanceOf[ParquetScan] + val fileScan3 = df3.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get + val parquetScan3 = fileScan3.asInstanceOf[BatchScanExec].scan.asInstanceOf[ParquetScan] assert(parquetScan3.createReaderFactory().supportColumnarReads(null)) checkAnswer(df3, df.selectExpr(columns : _*)) } From c5b827ec042bac6de3f014e852939ab2384274c8 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 9 Aug 2021 15:23:23 +0800 Subject: [PATCH 23/30] fix compile --- .../spark/sql/execution/datasources/orc/OrcQuerySuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 38feb519e0a77..9c578efd8af21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.datasources.{FileMetaCacheManager, HadoopFsRelation, LogicalRelation, RecordReaderIterator} -import org.apache.spark.sql.execution.datasources.v2.MicroBatchScanExec +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -734,7 +734,7 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { val readDf = spark.read.orc(path) val vectorizationEnabled = readDf.queryExecution.executedPlan.find { - case scan @ (_: FileSourceScanExec | _: MicroBatchScanExec) => scan.supportsColumnar + case scan @ (_: FileSourceScanExec | _: BatchScanExec) => scan.supportsColumnar case _ => false }.isDefined assert(vectorizationEnabled) From 179e7b0a53266ec5b69b435057598581965a552e Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 9 Aug 2021 15:27:29 +0800 Subject: [PATCH 24/30] add teste case --- .../parquet/ParquetQuerySuite.scala | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 9ef43995467c6..80183ff9e663e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.{SchemaColumnConvertNotSupportedException, SQLHadoopMapReduceCommitProtocol} +import org.apache.spark.sql.execution.datasources.{FileMetaCacheManager, SchemaColumnConvertNotSupportedException, SQLHadoopMapReduceCommitProtocol} import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan @@ -933,6 +933,39 @@ class ParquetV1QuerySuite extends ParquetQuerySuite { } } } + + test("SPARK-33449: simple select queries with file meta cache") { + withSQLConf(SQLConf.FILE_META_CACHE_PARQUET_ENABLED.key -> "true") { + val tableName = "parquet_use_meta_cache" + withTable(tableName) { + (0 until 10).map(i => (i, i.toString)).toDF("id", "value") + .write.saveAsTable(tableName) + try { + val statsBeforeQuery = FileMetaCacheManager.cacheStats + checkAnswer(sql(s"SELECT id FROM $tableName where id > 5"), + (6 until 10).map(Row.apply(_))) + val statsAfterQuery1 = FileMetaCacheManager.cacheStats + // The 1st query triggers 4 times file meta read: 2 times related to + // push down filter and 2 times related to file read. The 1st query + // run twice: df.collect() and df.rdd.count(), so it triggers 8 times + // file meta read in total. missCount is 2 because cache is empty and + // 2 meta files need load, other 6 times will read meta from cache. + assert(statsAfterQuery1.missCount() - statsBeforeQuery.missCount() == 2) + assert(statsAfterQuery1.hitCount() - statsBeforeQuery.hitCount() == 6) + checkAnswer(sql(s"SELECT id FROM $tableName where id < 5"), + (0 until 5).map(Row.apply(_))) + val statsAfterQuery2 = FileMetaCacheManager.cacheStats + // The 2nd query also triggers 8 times file meta read in total and + // all read from meta cache, so missCount no growth and hitCount + // increase 8 times. + assert(statsAfterQuery2.missCount() - statsAfterQuery1.missCount() == 0) + assert(statsAfterQuery2.hitCount() - statsAfterQuery1.hitCount() == 8) + } finally { + FileMetaCacheManager.cleanUp() + } + } + } + } } class ParquetV2QuerySuite extends ParquetQuerySuite { From 36f502e6e7982f5344eb7e493af8e193a01702ca Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 9 Aug 2021 15:31:09 +0800 Subject: [PATCH 25/30] add teste case --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../sql/execution/datasources/FileMetaCacheManager.scala | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6c25df9bf9658..f29d0d8f09162 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -971,20 +971,20 @@ object SQLConf { val FILE_META_CACHE_PARQUET_ENABLED = buildConf("spark.sql.fileMetaCache.parquet.enabled") .doc("To indicate if enable parquet file meta cache, it is recommended to enabled " + "this config when multiple queries are performed on the same dataset, default is false.") - .version("3.2.0") + .version("3.3.0") .booleanConf .createWithDefault(false) val FILE_META_CACHE_ORC_ENABLED = buildConf("spark.sql.fileMetaCache.orc.enabled") .doc("To indicate if enable orc file meta cache, it is recommended to enabled " + "this config when multiple queries are performed on the same dataset, default is false.") - .version("3.2.0") + .version("3.3.0") .booleanConf .createWithDefault(false) val FILE_META_CACHE_TTL_SINCE_LAST_ACCESS = buildConf("spark.sql.fileMetaCache.ttlSinceLastAccess") - .version("3.2.0") + .version("3.3.0") .doc("Time-to-live for file metadata cache entry after last access, the unit is seconds.") .timeConf(TimeUnit.SECONDS) .createWithDefault(3600L) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala index c94c58cd14c31..9c90eb3a3804b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources import java.util.concurrent.TimeUnit -import com.google.common.cache.{CacheBuilder, CacheLoader, CacheStats} +import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine} +import com.github.benmanes.caffeine.cache.stats.CacheStats import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -49,7 +50,7 @@ private[sql] object FileMetaCacheManager extends Logging { private lazy val ttlTime = SparkEnv.get.conf.get(SQLConf.FILE_META_CACHE_TTL_SINCE_LAST_ACCESS) - private lazy val cache = CacheBuilder + private lazy val cache = Caffeine .newBuilder() .expireAfterAccess(ttlTime, TimeUnit.SECONDS) .recordStats() From 850c52b727bf36ac3019acd7cf9588d9f6ebfefc Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 9 Aug 2021 18:07:34 +0800 Subject: [PATCH 26/30] fix java style --- .../datasources/parquet/SpecificParquetRecordReaderBase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 51d54ba0ed4cc..9d6dec45af332 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -95,8 +95,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont readFooterByRange(configuration, split.getStart(), split.getStart() + split.getLength()); this.fileSchema = footer.getFileMetaData().getSchema(); FilterCompat.Filter filter = ParquetInputFormat.getFilter(configuration); - List blocks = RowGroupFilter.filterRowGroups(filter, footer.getBlocks(), fileSchema); - + List blocks = + RowGroupFilter.filterRowGroups(filter, footer.getBlocks(), fileSchema); Map fileMetadata = footer.getFileMetaData().getKeyValueMetaData(); ReadSupport readSupport = getReadSupportInstance(getReadSupportClass(configuration)); ReadSupport.ReadContext readContext = readSupport.init(new InitContext( From 630f8db424d5090d1c890e7dd414bc5b1960b8c6 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Tue, 10 Aug 2021 17:25:38 +0800 Subject: [PATCH 27/30] Add some helper method --- .../execution/datasources/orc/OrcFileFormat.scala | 6 ++---- .../sql/execution/datasources/orc/OrcFileMeta.scala | 8 +++++++- .../datasources/parquet/ParquetFileFormat.scala | 9 +++------ .../datasources/parquet/ParquetFileMeta.scala | 8 +++++++- .../v2/orc/OrcPartitionReaderFactory.scala | 13 +++++-------- .../v2/parquet/ParquetPartitionReaderFactory.scala | 12 +++++------- 6 files changed, 29 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 20c03660d5ecf..6f75450e86c99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -162,8 +162,7 @@ class OrcFileFormat val fs = filePath.getFileSystem(conf) val readerOptions = if (metaCacheEnabled) { - val tail = FileMetaCacheManager.get(OrcFileMetaKey(filePath, conf)) - .asInstanceOf[OrcFileMeta].tail + val tail = OrcFileMeta.readTailFromCache(filePath, conf) OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) } else { OrcFile.readerOptions(conf).filesystem(fs) @@ -210,8 +209,7 @@ class OrcFileFormat val requestedPartitionColIds = Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) if (metaCacheEnabled) { - val tail = FileMetaCacheManager.get(OrcFileMetaKey(filePath, conf)) - .asInstanceOf[OrcFileMeta].tail + val tail = OrcFileMeta.readTailFromCache(filePath, conf) batchReader.setCachedTail(tail) } batchReader.initialize(fileSplit, taskAttemptContext) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala index bee6a9c55cac0..173d77b421954 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.orc.OrcFile import org.apache.orc.impl.{OrcTail, ReaderImpl} -import org.apache.spark.sql.execution.datasources.{FileMeta, FileMetaKey} +import org.apache.spark.sql.execution.datasources.{FileMeta, FileMetaCacheManager, FileMetaKey} import org.apache.spark.util.Utils private[sql] case class OrcFileMetaKey(path: Path, configuration: Configuration) @@ -40,6 +40,12 @@ private[sql] object OrcFileMeta { new OrcFileMeta(fileReader.getOrcTail) } } + + def readTailFromCache(path: Path, conf: Configuration): OrcTail = + readTailFromCache(OrcFileMetaKey(path, conf)) + + def readTailFromCache(key: OrcFileMetaKey): OrcTail = + FileMetaCacheManager.get(key).asInstanceOf[OrcFileMeta].tail } private[sql] class ForTailCacheReader(path: Path, options: OrcFile.ReaderOptions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 95b205162200e..dd3e7a59c3a4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -271,8 +271,7 @@ class ParquetFileFormat sharedConf.getBoolean(SQLConf.FILE_META_CACHE_PARQUET_ENABLED.key, false) lazy val footerFileMetaData = if (metaCacheEnabled) { - FileMetaCacheManager.get(ParquetFileMetaKey(filePath, sharedConf)) - .asInstanceOf[ParquetFileMeta].footer.getFileMetaData + ParquetFileMeta.readFooterFromCache(filePath, sharedConf).getFileMetaData } else { ParquetFooterReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData } @@ -339,10 +338,8 @@ class ParquetFileFormat capacity) // Set footer before initialize. if (metaCacheEnabled) { - val fileMeta = FileMetaCacheManager - .get(ParquetFileMetaKey(filePath, sharedConf)) - .asInstanceOf[ParquetFileMeta] - vectorizedReader.setCachedFooter(fileMeta.footer) + val footer = ParquetFileMeta.readFooterFromCache(filePath, sharedConf) + vectorizedReader.setCachedFooter(footer) } val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion listener before `initialization`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala index b5b0a68a83ae6..51d06ae81230b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala @@ -23,7 +23,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.metadata.ParquetMetadata -import org.apache.spark.sql.execution.datasources.{FileMeta, FileMetaKey} +import org.apache.spark.sql.execution.datasources.{FileMeta, FileMetaCacheManager, FileMetaKey} private[sql] case class ParquetFileMetaKey(path: Path, configuration: Configuration) extends FileMetaKey { @@ -36,4 +36,10 @@ private[sql] object ParquetFileMeta { def apply(path: Path, conf: Configuration): ParquetFileMeta = { new ParquetFileMeta(ParquetFileReader.readFooter(conf, path, NO_FILTER)) } + + def readFooterFromCache(path: Path, conf: Configuration): ParquetMetadata = + readFooterFromCache(ParquetFileMetaKey(path, conf)) + + def readFooterFromCache(key: ParquetFileMetaKey): ParquetMetadata = + FileMetaCacheManager.get(key).asInstanceOf[ParquetFileMeta].footer } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index c27ced850202f..2e1d8ef2892d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -30,8 +30,8 @@ import org.apache.orc.mapreduce.OrcInputFormat import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} -import org.apache.spark.sql.execution.datasources.{FileMetaCacheManager, PartitionedFile} -import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFileMeta, OrcFileMetaKey, OrcFilters, OrcUtils} +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFileMeta, OrcFilters, OrcUtils} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter @@ -90,8 +90,7 @@ case class OrcPartitionReaderFactory( val fs = filePath.getFileSystem(conf) val readerOptions = if (orcMetaCacheEnabled) { - val tail = FileMetaCacheManager.get(OrcFileMetaKey(filePath, conf)) - .asInstanceOf[OrcFileMeta].tail + val tail = OrcFileMeta.readTailFromCache(filePath, conf) OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) } else { OrcFile.readerOptions(conf).filesystem(fs) @@ -143,8 +142,7 @@ case class OrcPartitionReaderFactory( val fs = filePath.getFileSystem(conf) val readerOptions = if (orcMetaCacheEnabled) { - val tail = FileMetaCacheManager.get(OrcFileMetaKey(filePath, conf)) - .asInstanceOf[OrcFileMeta].tail + val tail = OrcFileMeta.readTailFromCache(filePath, conf) OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) } else { OrcFile.readerOptions(conf).filesystem(fs) @@ -172,8 +170,7 @@ case class OrcPartitionReaderFactory( val batchReader = new OrcColumnarBatchReader(capacity) if (orcMetaCacheEnabled) { - val tail = FileMetaCacheManager.get(OrcFileMetaKey(filePath, conf)) - .asInstanceOf[OrcFileMeta].tail + val tail = OrcFileMeta.readTailFromCache(filePath, conf) batchReader.setCachedTail(tail) } batchReader.initialize(fileSplit, taskAttemptContext) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 60bbca9f2bcee..555b6e5f2143e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} -import org.apache.spark.sql.execution.datasources.{DataSourceUtils, FileMetaCacheManager, PartitionedFile, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.parquet._ import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf @@ -133,8 +133,7 @@ case class ParquetPartitionReaderFactory( val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) lazy val footerFileMetaData = if (parquetMetaCacheEnabled) { - FileMetaCacheManager.get(ParquetFileMetaKey(filePath, conf)) - .asInstanceOf[ParquetFileMeta].footer.getFileMetaData + ParquetFileMeta.readFooterFromCache(filePath, conf).getFileMetaData } else { ParquetFooterReader.readFooter(conf, filePath, SKIP_ROW_GROUPS).getFileMetaData } @@ -256,10 +255,9 @@ case class ParquetPartitionReaderFactory( capacity) // Set footer before initialize. if (parquetMetaCacheEnabled) { - val fileMeta = FileMetaCacheManager - .get(ParquetFileMetaKey(split.getPath, hadoopAttemptContext.getConfiguration)) - .asInstanceOf[ParquetFileMeta] - vectorizedReader.setCachedFooter(fileMeta.footer) + val fileMeta = + ParquetFileMeta.readFooterFromCache(split.getPath, hadoopAttemptContext.getConfiguration) + vectorizedReader.setCachedFooter(fileMeta) } val iter = new RecordReaderIterator(vectorizedReader) // SPARK-23457 Register a task completion listener before `initialization`. From fa75a95b061471081e1f5fadd7f42d1f7f492596 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 16 Aug 2021 15:15:58 +0800 Subject: [PATCH 28/30] to parquet only pr --- .../apache/spark/sql/internal/SQLConf.scala | 9 ---- .../orc/OrcColumnarBatchReader.java | 10 +--- .../datasources/orc/OrcFileFormat.scala | 14 +---- .../datasources/orc/OrcFileMeta.scala | 54 ------------------- .../v2/orc/OrcPartitionReaderFactory.scala | 21 ++------ .../datasources/orc/OrcQuerySuite.scala | 33 ------------ 6 files changed, 5 insertions(+), 136 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f4e0c4c714b0e..09fb414fe429d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -974,13 +974,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val FILE_META_CACHE_ORC_ENABLED = buildConf("spark.sql.fileMetaCache.orc.enabled") - .doc("To indicate if enable orc file meta cache, it is recommended to enabled " + - "this config when multiple queries are performed on the same dataset, default is false.") - .version("3.3.0") - .booleanConf - .createWithDefault(false) - val FILE_META_CACHE_TTL_SINCE_LAST_ACCESS = buildConf("spark.sql.fileMetaCache.ttlSinceLastAccess") .version("3.3.0") @@ -3623,8 +3616,6 @@ class SQLConf extends Serializable with Logging { def fileMetaCacheParquetEnabled: Boolean = getConf(FILE_META_CACHE_PARQUET_ENABLED) - def fileMetaCacheOrcEnabled: Boolean = getConf(FILE_META_CACHE_ORC_ENABLED) - def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java index d911ae792327f..40ed0b2454c12 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java @@ -29,7 +29,6 @@ import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.TypeDescription; -import org.apache.orc.impl.OrcTail; import org.apache.orc.mapred.OrcInputFormat; import org.apache.spark.sql.catalyst.InternalRow; @@ -75,8 +74,6 @@ public class OrcColumnarBatchReader extends RecordReader { // The wrapped ORC column vectors. private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers; - private OrcTail cachedTail; - public OrcColumnarBatchReader(int capacity) { this.capacity = capacity; } @@ -127,8 +124,7 @@ public void initialize( fileSplit.getPath(), OrcFile.readerOptions(conf) .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) - .filesystem(fileSplit.getPath().getFileSystem(conf)) - .orcTail(cachedTail)); + .filesystem(fileSplit.getPath().getFileSystem(conf))); Reader.Options options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart(), fileSplit.getLength()); recordReader = reader.rows(options); @@ -193,10 +189,6 @@ public void initBatch( columnarBatch = new ColumnarBatch(orcVectorWrappers); } - public void setCachedTail(OrcTail cachedTail) { - this.cachedTail = cachedTail; - } - /** * Return true if there exists more data in the next batch. If exists, prepare the next batch * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index 6f75450e86c99..939743adc0c0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -155,18 +154,11 @@ class OrcFileFormat (file: PartitionedFile) => { val conf = broadcastedConf.value.value - val metaCacheEnabled = - conf.getBoolean(SQLConf.FILE_META_CACHE_ORC_ENABLED.key, false) val filePath = new Path(new URI(file.filePath)) val fs = filePath.getFileSystem(conf) - val readerOptions = if (metaCacheEnabled) { - val tail = OrcFileMeta.readTailFromCache(filePath, conf) - OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) - } else { - OrcFile.readerOptions(conf).filesystem(fs) - } + val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( @@ -208,10 +200,6 @@ class OrcFileFormat val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) val requestedPartitionColIds = Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) - if (metaCacheEnabled) { - val tail = OrcFileMeta.readTailFromCache(filePath, conf) - batchReader.setCachedTail(tail) - } batchReader.initialize(fileSplit, taskAttemptContext) batchReader.initBatch( TypeDescription.fromString(resultSchemaString), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala deleted file mode 100644 index 173d77b421954..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.orc - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.orc.OrcFile -import org.apache.orc.impl.{OrcTail, ReaderImpl} - -import org.apache.spark.sql.execution.datasources.{FileMeta, FileMetaCacheManager, FileMetaKey} -import org.apache.spark.util.Utils - -private[sql] case class OrcFileMetaKey(path: Path, configuration: Configuration) - extends FileMetaKey { - override def getFileMeta: OrcFileMeta = OrcFileMeta(path, configuration) -} - -private[sql] case class OrcFileMeta(tail: OrcTail) extends FileMeta - -private[sql] object OrcFileMeta { - def apply(path: Path, conf: Configuration): OrcFileMeta = { - val fs = path.getFileSystem(conf) - val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) - Utils.tryWithResource(new ForTailCacheReader(path, readerOptions)) { fileReader => - new OrcFileMeta(fileReader.getOrcTail) - } - } - - def readTailFromCache(path: Path, conf: Configuration): OrcTail = - readTailFromCache(OrcFileMetaKey(path, conf)) - - def readTailFromCache(key: OrcFileMetaKey): OrcTail = - FileMetaCacheManager.get(key).asInstanceOf[OrcFileMeta].tail -} - -private[sql] class ForTailCacheReader(path: Path, options: OrcFile.ReaderOptions) - extends ReaderImpl(path, options) { - def getOrcTail: OrcTail = tail -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index 2e1d8ef2892d0..930adc08e77a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -31,7 +31,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFileMeta, OrcFilters, OrcUtils} +import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFilters, OrcUtils} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter @@ -60,7 +60,6 @@ case class OrcPartitionReaderFactory( private val capacity = sqlConf.orcVectorizedReaderBatchSize private val orcFilterPushDown = sqlConf.orcFilterPushDown private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles - private val orcMetaCacheEnabled = sqlConf.fileMetaCacheOrcEnabled override def supportColumnarReads(partition: InputPartition): Boolean = { sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled && @@ -89,12 +88,7 @@ case class OrcPartitionReaderFactory( pushDownPredicates(filePath, conf) val fs = filePath.getFileSystem(conf) - val readerOptions = if (orcMetaCacheEnabled) { - val tail = OrcFileMeta.readTailFromCache(filePath, conf) - OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) - } else { - OrcFile.readerOptions(conf).filesystem(fs) - } + val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( @@ -141,12 +135,7 @@ case class OrcPartitionReaderFactory( pushDownPredicates(filePath, conf) val fs = filePath.getFileSystem(conf) - val readerOptions = if (orcMetaCacheEnabled) { - val tail = OrcFileMeta.readTailFromCache(filePath, conf) - OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) - } else { - OrcFile.readerOptions(conf).filesystem(fs) - } + val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( @@ -169,10 +158,6 @@ case class OrcPartitionReaderFactory( val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId) val batchReader = new OrcColumnarBatchReader(capacity) - if (orcMetaCacheEnabled) { - val tail = OrcFileMeta.readTailFromCache(filePath, conf) - batchReader.setCachedTail(tail) - } batchReader.initialize(fileSplit, taskAttemptContext) val requestedPartitionColIds = Array.fill(readDataSchema.length)(-1) ++ Range(0, partitionSchema.length) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 9c578efd8af21..15d24f260cd98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -742,39 +742,6 @@ abstract class OrcQuerySuite extends OrcQueryTest with SharedSparkSession { } } } - - test("SPARK-33449: simple select queries with file meta cache") { - withSQLConf(SQLConf.FILE_META_CACHE_ORC_ENABLED.key -> "true") { - val tableName = "orc_use_meta_cache" - withTable(tableName) { - (0 until 10).map(i => (i, i.toString)).toDF("id", "value") - .write.format("orc").saveAsTable(tableName) - try { - val statsBeforeQuery = FileMetaCacheManager.cacheStats - checkAnswer(sql(s"SELECT id FROM $tableName where id > 5"), - (6 until 10).map(Row.apply(_))) - val statsAfterQuery1 = FileMetaCacheManager.cacheStats - // The 1st query triggers 4 times file meta read: 2 times related to - // push down filter and 2 times related to file read. The 1st query - // run twice: df.collect() and df.rdd.count(), so it triggers 8 times - // file meta read in total. missCount is 2 because cache is empty and - // 2 meta files need load, other 6 times will read meta from cache. - assert(statsAfterQuery1.missCount() - statsBeforeQuery.missCount() == 2) - assert(statsAfterQuery1.hitCount() - statsBeforeQuery.hitCount() == 6) - checkAnswer(sql(s"SELECT id FROM $tableName where id < 5"), - (0 until 5).map(Row.apply(_))) - val statsAfterQuery2 = FileMetaCacheManager.cacheStats - // The 2nd query also triggers 8 times file meta read in total and - // all read from meta cache, so missCount no growth and hitCount - // increase 8 times. - assert(statsAfterQuery2.missCount() - statsAfterQuery1.missCount() == 0) - assert(statsAfterQuery2.hitCount() - statsAfterQuery1.hitCount() == 8) - } finally { - FileMetaCacheManager.cleanUp() - } - } - } - } } class OrcV1QuerySuite extends OrcQuerySuite { From 4c022d742771f866866fca4615b6126e86bdca2a Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 16 Aug 2021 15:39:08 +0800 Subject: [PATCH 29/30] remove unused import --- .../spark/sql/execution/datasources/orc/OrcQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala index 15d24f260cd98..680c2cf2b42e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala @@ -35,7 +35,7 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.{FileMetaCacheManager, HadoopFsRelation, LogicalRelation, RecordReaderIterator} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, RecordReaderIterator} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession From 104b1256a23300b7f7912c7bf37fc7b14ac5099c Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 16 Aug 2021 17:11:37 +0800 Subject: [PATCH 30/30] remove private sql and add comments --- .../execution/datasources/FileMetaCacheManager.scala | 10 +++++++--- .../datasources/parquet/ParquetFileMeta.scala | 6 +++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala index 9c90eb3a3804b..3d1016d04b4ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.internal.SQLConf * of the `FileMetaKey` and the `getFileMeta` method of `FileMetaKey` is used to return the file * meta of the corresponding file format. */ -private[sql] object FileMetaCacheManager extends Logging { +object FileMetaCacheManager extends Logging { private lazy val cacheLoader = new CacheLoader[FileMetaKey, FileMeta]() { override def load(entry: FileMetaKey): FileMeta = { @@ -56,6 +56,10 @@ private[sql] object FileMetaCacheManager extends Logging { .recordStats() .build[FileMetaKey, FileMeta](cacheLoader) + /** + * Returns the `FileMeta` associated with the `FileMetaKey` in the `FileMetaCacheManager`, + * obtaining that the `FileMeta` from `cacheLoader.load(FileMetaKey)` if necessary. + */ def get(dataFile: FileMetaKey): FileMeta = cache.get(dataFile) /** @@ -69,7 +73,7 @@ private[sql] object FileMetaCacheManager extends Logging { def cleanUp(): Unit = cache.cleanUp() } -private[sql] abstract class FileMetaKey { +abstract class FileMetaKey { def path: Path def configuration: Configuration def getFileMeta: FileMeta @@ -80,4 +84,4 @@ private[sql] abstract class FileMetaKey { } } -private[sql] trait FileMeta +trait FileMeta diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala index 51d06ae81230b..2042c33194fa2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileMeta.scala @@ -25,14 +25,14 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata import org.apache.spark.sql.execution.datasources.{FileMeta, FileMetaCacheManager, FileMetaKey} -private[sql] case class ParquetFileMetaKey(path: Path, configuration: Configuration) +case class ParquetFileMetaKey(path: Path, configuration: Configuration) extends FileMetaKey { override def getFileMeta: ParquetFileMeta = ParquetFileMeta(path, configuration) } -private[sql] class ParquetFileMeta(val footer: ParquetMetadata) extends FileMeta +class ParquetFileMeta(val footer: ParquetMetadata) extends FileMeta -private[sql] object ParquetFileMeta { +object ParquetFileMeta { def apply(path: Path, conf: Configuration): ParquetFileMeta = { new ParquetFileMeta(ParquetFileReader.readFooter(conf, path, NO_FILTER)) }