diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9f71ecb756a6d..2bc34e5f6f707 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -963,6 +963,36 @@ object SQLConf { .booleanConf .createWithDefault(false) + val FILE_META_CACHE_ENABLED_SOURCE_LIST = buildConf("spark.sql.fileMetaCache.enabledSourceList") + .doc("A comma-separated list of data source short names for which data source enabled file " + + "meta cache, now the file meta cache only support ORC, it is recommended to enabled this " + + "config when multiple queries are performed on the same dataset, default is false." + + "Warning: if the fileMetaCache is enabled, the existing data files should not be " + + "replaced with the same file name, otherwise there will be a risk of job failure or wrong " + + "data reading before the cache entry expires.") + .version("3.3.0") + .stringConf + .checkValue(value => { + val valueList = value.toLowerCase(Locale.ROOT).split(",").map(_.trim) + value.trim.isEmpty || valueList.length == 1 && valueList.contains("orc") + }, s"spark.sql.fileMetaCache.enabledSourceList only support orc now") + .createWithDefault("") + + val FILE_META_CACHE_TTL_SINCE_LAST_ACCESS_SEC = + buildConf("spark.sql.fileMetaCache.ttlSinceLastAccessSec") + .version("3.3.0") + .doc("Time-to-live for file metadata cache entry after last access, the unit is seconds.") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(600L) + + val FILE_META_CACHE_MAXIMUM_SIZE = + buildConf("spark.sql.fileMetaCache.maximumSize") + .version("3.3.0") + .doc("Maximum number of file meta entries the file meta cache contains.") + .intConf + .checkValue(_ > 0, "The value of fileMetaCache maximumSize must be positive") + .createWithDefault(1000) + val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") .doc("When true, check all the partition paths under the table\'s root directory " + "when reading data stored in HDFS. This configuration will be deprecated in the future " + @@ -3621,6 +3651,12 @@ class SQLConf extends Serializable with Logging { def parquetVectorizedReaderBatchSize: Int = getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE) + def fileMetaCacheEnabled(ds: String): Boolean = { + val enabledList = getConf(FILE_META_CACHE_ENABLED_SOURCE_LIST).toLowerCase(Locale.ROOT) + .split(",").map(_.trim) + enabledList.contains(ds.toLowerCase(Locale.ROOT)) + } + def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED) diff --git a/sql/core/benchmarks/FileMetaCacheReadBenchmark-jdk11-results.txt b/sql/core/benchmarks/FileMetaCacheReadBenchmark-jdk11-results.txt new file mode 100644 index 0000000000000..8c50285a7d35f --- /dev/null +++ b/sql/core/benchmarks/FileMetaCacheReadBenchmark-jdk11-results.txt @@ -0,0 +1,95 @@ +================================================================================================ +count(*) From 100 files +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 10 columns with 100 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 217 225 5 24.1 41.5 1.0X +count(*): fileMetaCacheEnabled = true 153 156 2 34.3 29.1 1.4X +count(*) with Filter: fileMetaCacheEnabled = false 436 444 7 12.0 83.1 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 377 379 2 13.9 72.0 0.6X + +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 50 columns with 100 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 221 239 16 23.7 42.2 1.0X +count(*): fileMetaCacheEnabled = true 173 183 8 30.2 33.1 1.3X +count(*) with Filter: fileMetaCacheEnabled = false 494 496 2 10.6 94.3 0.4X +count(*) with Filter: fileMetaCacheEnabled = true 431 433 2 12.2 82.2 0.5X + +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 100 columns with 100 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 287 289 2 18.3 54.8 1.0X +count(*): fileMetaCacheEnabled = true 221 228 6 23.8 42.1 1.3X +count(*) with Filter: fileMetaCacheEnabled = false 553 555 2 9.5 105.4 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 504 506 2 10.4 96.1 0.6X + + +================================================================================================ +count(*) From 500 files +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 10 columns with 500 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 772 814 72 6.8 147.3 1.0X +count(*): fileMetaCacheEnabled = true 534 537 2 9.8 101.9 1.4X +count(*) with Filter: fileMetaCacheEnabled = false 1341 1343 3 3.9 255.8 0.6X +count(*) with Filter: fileMetaCacheEnabled = true 1115 1116 1 4.7 212.7 0.7X + +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 50 columns with 500 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 793 881 117 6.6 151.3 1.0X +count(*): fileMetaCacheEnabled = true 564 569 4 9.3 107.6 1.4X +count(*) with Filter: fileMetaCacheEnabled = false 1473 1475 3 3.6 281.0 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 1253 1254 1 4.2 238.9 0.6X + +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 100 columns with 500 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 862 902 45 6.1 164.4 1.0X +count(*): fileMetaCacheEnabled = true 623 631 9 8.4 118.9 1.4X +count(*) with Filter: fileMetaCacheEnabled = false 1695 1698 4 3.1 323.3 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 1437 1445 11 3.6 274.1 0.6X + + +================================================================================================ +count(*) From 1000 files +================================================================================================ + +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 10 columns with 1000 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 1459 1501 59 3.6 278.3 1.0X +count(*): fileMetaCacheEnabled = true 1091 1092 1 4.8 208.0 1.3X +count(*) with Filter: fileMetaCacheEnabled = false 2518 2520 3 2.1 480.2 0.6X +count(*) with Filter: fileMetaCacheEnabled = true 2122 2130 11 2.5 404.7 0.7X + +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 50 columns with 1000 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 1505 1506 1 3.5 287.0 1.0X +count(*): fileMetaCacheEnabled = true 1138 1138 1 4.6 217.1 1.3X +count(*) with Filter: fileMetaCacheEnabled = false 2787 2798 16 1.9 531.5 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 2405 2405 1 2.2 458.7 0.6X + +OpenJDK 64-Bit Server VM 11.0.12+7-LTS on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 100 columns with 1000 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 1610 1610 1 3.3 307.0 1.0X +count(*): fileMetaCacheEnabled = true 1299 1308 13 4.0 247.7 1.2X +count(*) with Filter: fileMetaCacheEnabled = false 3121 3123 3 1.7 595.4 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 2828 2828 1 1.9 539.3 0.6X + diff --git a/sql/core/benchmarks/FileMetaCacheReadBenchmark-results.txt b/sql/core/benchmarks/FileMetaCacheReadBenchmark-results.txt new file mode 100644 index 0000000000000..8dc820a1cbcfe --- /dev/null +++ b/sql/core/benchmarks/FileMetaCacheReadBenchmark-results.txt @@ -0,0 +1,95 @@ +================================================================================================ +count(*) From 100 files +================================================================================================ + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 10 columns with 100 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 190 196 8 27.6 36.2 1.0X +count(*): fileMetaCacheEnabled = true 134 138 5 39.2 25.5 1.4X +count(*) with Filter: fileMetaCacheEnabled = false 377 384 8 13.9 72.0 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 328 333 6 16.0 62.6 0.6X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 50 columns with 100 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 187 192 8 28.0 35.7 1.0X +count(*): fileMetaCacheEnabled = true 146 150 6 35.9 27.9 1.3X +count(*) with Filter: fileMetaCacheEnabled = false 396 400 7 13.2 75.5 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 351 355 5 14.9 67.0 0.5X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 100 columns with 100 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 237 241 6 22.1 45.2 1.0X +count(*): fileMetaCacheEnabled = true 192 197 6 27.3 36.6 1.2X +count(*) with Filter: fileMetaCacheEnabled = false 465 471 8 11.3 88.8 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 422 426 7 12.4 80.5 0.6X + + +================================================================================================ +count(*) From 500 files +================================================================================================ + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 10 columns with 500 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 647 656 6 8.1 123.4 1.0X +count(*): fileMetaCacheEnabled = true 431 437 7 12.2 82.3 1.5X +count(*) with Filter: fileMetaCacheEnabled = false 1157 1160 5 4.5 220.7 0.6X +count(*) with Filter: fileMetaCacheEnabled = true 934 947 11 5.6 178.2 0.7X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 50 columns with 500 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 673 684 9 7.8 128.5 1.0X +count(*): fileMetaCacheEnabled = true 461 468 9 11.4 87.9 1.5X +count(*) with Filter: fileMetaCacheEnabled = false 1277 1280 5 4.1 243.5 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 1052 1066 20 5.0 200.6 0.6X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 100 columns with 500 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 720 726 11 7.3 137.3 1.0X +count(*): fileMetaCacheEnabled = true 503 509 10 10.4 96.0 1.4X +count(*) with Filter: fileMetaCacheEnabled = false 1468 1469 1 3.6 280.0 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 1232 1238 9 4.3 234.9 0.6X + + +================================================================================================ +count(*) From 1000 files +================================================================================================ + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 10 columns with 1000 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 1239 1245 9 4.2 236.3 1.0X +count(*): fileMetaCacheEnabled = true 995 996 2 5.3 189.7 1.2X +count(*) with Filter: fileMetaCacheEnabled = false 2161 2169 12 2.4 412.1 0.6X +count(*) with Filter: fileMetaCacheEnabled = true 1864 1865 1 2.8 355.5 0.7X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 50 columns with 1000 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 1292 1294 3 4.1 246.5 1.0X +count(*): fileMetaCacheEnabled = true 1086 1097 16 4.8 207.2 1.2X +count(*) with Filter: fileMetaCacheEnabled = false 2388 2396 12 2.2 455.4 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 2176 2177 0 2.4 415.1 0.6X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Linux 4.14.0_1-0-0-42 +Intel(R) Xeon(R) Gold 6271C CPU @ 2.60GHz +count(*) from 100 columns with 1000 files: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------------- +count(*): fileMetaCacheEnabled = false 1371 1372 2 3.8 261.5 1.0X +count(*): fileMetaCacheEnabled = true 1084 1096 17 4.8 206.7 1.3X +count(*) with Filter: fileMetaCacheEnabled = false 2698 2708 13 1.9 514.7 0.5X +count(*) with Filter: fileMetaCacheEnabled = true 2408 2408 0 2.2 459.2 0.6X + diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java index 40ed0b2454c12..d911ae792327f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java @@ -29,6 +29,7 @@ import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.OrcTail; import org.apache.orc.mapred.OrcInputFormat; import org.apache.spark.sql.catalyst.InternalRow; @@ -74,6 +75,8 @@ public class OrcColumnarBatchReader extends RecordReader { // The wrapped ORC column vectors. private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers; + private OrcTail cachedTail; + public OrcColumnarBatchReader(int capacity) { this.capacity = capacity; } @@ -124,7 +127,8 @@ public void initialize( fileSplit.getPath(), OrcFile.readerOptions(conf) .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) - .filesystem(fileSplit.getPath().getFileSystem(conf))); + .filesystem(fileSplit.getPath().getFileSystem(conf)) + .orcTail(cachedTail)); Reader.Options options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart(), fileSplit.getLength()); recordReader = reader.rows(options); @@ -189,6 +193,10 @@ public void initBatch( columnarBatch = new ColumnarBatch(orcVectorWrappers); } + public void setCachedTail(OrcTail cachedTail) { + this.cachedTail = cachedTail; + } + /** * Return true if there exists more data in the next batch. If exists, prepare the next batch * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala new file mode 100644 index 0000000000000..9d3f5a0a885dc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileMetaCacheManager.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.concurrent.TimeUnit + +import com.google.common.cache.{CacheBuilder, CacheLoader, CacheStats} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.internal.SQLConf + +/** + * A singleton Cache Manager to caching file meta. We cache these file metas in order to speed up + * iterated queries over the same dataset. Otherwise, each query would have to hit remote storage + * in order to fetch file meta before read files. + * + * We should implement the corresponding `FileMetaKey` for a specific file format, for example + * `ParquetFileMetaKey` or `OrcFileMetaKey`. By default, the file path is used as the identification + * of the `FileMetaKey` and the `getFileMeta` method of `FileMetaKey` is used to return the file + * meta of the corresponding file format. + */ +object FileMetaCacheManager extends Logging { + + private lazy val cacheLoader = new CacheLoader[FileMetaKey, FileMeta]() { + override def load(fileMetaKey: FileMetaKey): FileMeta = { + logDebug(s"Loading Data File Meta ${fileMetaKey.path}") + fileMetaKey.getFileMeta + } + } + + private lazy val ttlTime = + SparkEnv.get.conf.get(SQLConf.FILE_META_CACHE_TTL_SINCE_LAST_ACCESS_SEC) + + private lazy val maximumSize = + SparkEnv.get.conf.get(SQLConf.FILE_META_CACHE_MAXIMUM_SIZE) + + private lazy val cache = CacheBuilder + .newBuilder() + .expireAfterAccess(ttlTime, TimeUnit.SECONDS) + .maximumSize(maximumSize) + .recordStats() + .build[FileMetaKey, FileMeta](cacheLoader) + + /** + * Returns the `FileMeta` associated with the `FileMetaKey` in the `FileMetaCacheManager`, + * obtaining that the `FileMeta` from `cacheLoader.load(FileMetaKey)` if necessary. + */ + def get(fileMeteKey: FileMetaKey): FileMeta = cache.get(fileMeteKey) + + /** + * Return current snapshot of FileMeta Cache's cumulative statistics + * include cache hitCount, missCount and so on. + * This method is only called when testing now. + */ + private def cacheStats: CacheStats = cache.stats() + + /** + * Use to cleanUp entries in the FileMeta Cache. + * This method is only called when testing now. + */ + private def cleanUp(): Unit = cache.cleanUp() +} + +abstract class FileMetaKey { + def path: Path + def configuration: Configuration + def getFileMeta: FileMeta + override def hashCode(): Int = path.hashCode + override def equals(other: Any): Boolean = other match { + case key: FileMetaKey => + this.getClass.equals(other.getClass) && path.equals(key.path) + case _ => false + } +} + +trait FileMeta diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala index c4ffdb402fab6..429ab094cd544 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala @@ -144,6 +144,7 @@ class OrcFileFormat val sqlConf = sparkSession.sessionState.conf val enableVectorizedReader = supportBatch(sparkSession, resultSchema) val capacity = sqlConf.orcVectorizedReaderBatchSize + val metaCacheEnabled = sqlConf.fileMetaCacheEnabled(shortName()) OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(hadoopConf, sqlConf.caseSensitiveAnalysis) @@ -159,7 +160,12 @@ class OrcFileFormat val filePath = new Path(new URI(file.filePath)) val fs = filePath.getFileSystem(conf) - val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) + val readerOptions = if (metaCacheEnabled) { + val tail = OrcFileMeta.readTailFromCache(filePath, conf) + OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) + } else { + OrcFile.readerOptions(conf).filesystem(fs) + } val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( @@ -201,6 +207,10 @@ class OrcFileFormat val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) val requestedPartitionColIds = Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) + if (metaCacheEnabled) { + val tail = OrcFileMeta.readTailFromCache(filePath, conf) + batchReader.setCachedTail(tail) + } batchReader.initialize(fileSplit, taskAttemptContext) batchReader.initBatch( TypeDescription.fromString(resultSchemaString), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala new file mode 100644 index 0000000000000..9f685da2d3943 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileMeta.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.orc.OrcFile +import org.apache.orc.impl.{OrcTail, ReaderImpl} + +import org.apache.spark.sql.execution.datasources.{FileMeta, FileMetaCacheManager, FileMetaKey} +import org.apache.spark.util.Utils + +case class OrcFileMetaKey(path: Path, configuration: Configuration) + extends FileMetaKey { + override def getFileMeta: OrcFileMeta = OrcFileMeta(path, configuration) +} + +case class OrcFileMeta(tail: OrcTail) extends FileMeta + +object OrcFileMeta { + def apply(path: Path, conf: Configuration): OrcFileMeta = { + val fs = path.getFileSystem(conf) + val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) + Utils.tryWithResource(new ReaderImpl(path, readerOptions)) { fileReader => + new OrcFileMeta(new OrcTail(fileReader.getFileTail, fileReader.getSerializedFileFooter)) + } + } + + def readTailFromCache(path: Path, conf: Configuration): OrcTail = + FileMetaCacheManager.get(OrcFileMetaKey(path, conf)).asInstanceOf[OrcFileMeta].tail +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index c5020cb79524c..c8b2b480e564b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader} import org.apache.spark.sql.execution.WholeStageCodegenExec import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFilters, OrcUtils} +import org.apache.spark.sql.execution.datasources.orc.{OrcColumnarBatchReader, OrcDeserializer, OrcFileMeta, OrcFilters, OrcUtils} import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.Filter @@ -61,6 +61,7 @@ case class OrcPartitionReaderFactory( private val capacity = sqlConf.orcVectorizedReaderBatchSize private val orcFilterPushDown = sqlConf.orcFilterPushDown private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles + private val metaCacheEnabled = sqlConf.fileMetaCacheEnabled("orc") override def supportColumnarReads(partition: InputPartition): Boolean = { sqlConf.orcVectorizedReaderEnabled && sqlConf.wholeStageEnabled && @@ -89,7 +90,12 @@ case class OrcPartitionReaderFactory( pushDownPredicates(filePath, conf) val fs = filePath.getFileSystem(conf) - val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) + val readerOptions = if (metaCacheEnabled) { + val tail = OrcFileMeta.readTailFromCache(filePath, conf) + OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) + } else { + OrcFile.readerOptions(conf).filesystem(fs) + } val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( @@ -136,7 +142,12 @@ case class OrcPartitionReaderFactory( pushDownPredicates(filePath, conf) val fs = filePath.getFileSystem(conf) - val readerOptions = OrcFile.readerOptions(conf).filesystem(fs) + val readerOptions = if (metaCacheEnabled) { + val tail = OrcFileMeta.readTailFromCache(filePath, conf) + OrcFile.readerOptions(conf).filesystem(fs).orcTail(tail) + } else { + OrcFile.readerOptions(conf).filesystem(fs) + } val resultedColPruneInfo = Utils.tryWithResource(OrcFile.createReader(filePath, readerOptions)) { reader => OrcUtils.requestedColumnIds( @@ -159,6 +170,10 @@ case class OrcPartitionReaderFactory( val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId) val batchReader = new OrcColumnarBatchReader(capacity) + if (metaCacheEnabled) { + val tail = OrcFileMeta.readTailFromCache(filePath, conf) + batchReader.setCachedTail(tail) + } batchReader.initialize(fileSplit, taskAttemptContext) val requestedPartitionColIds = Array.fill(readDataSchema.length)(-1) ++ Range(0, partitionSchema.length) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileMetaCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileMetaCacheSuite.scala new file mode 100644 index 0000000000000..2a1620e3f268d --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileMetaCacheSuite.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +import com.google.common.cache.CacheStats +import org.scalatest.PrivateMethodTester + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.datasources.FileMetaCacheManager +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +abstract class FileMetaCacheSuite extends QueryTest with SharedSparkSession + with PrivateMethodTester { + import testImplicits._ + + private val cacheStatsMethod = PrivateMethod[CacheStats](Symbol("cacheStats")) + private val cleanUpMethod = PrivateMethod[Unit](Symbol("cleanUp")) + + test("SPARK-36516: simple select queries with orc file meta cache") { + withSQLConf(SQLConf.FILE_META_CACHE_ENABLED_SOURCE_LIST.key -> "orc") { + val tableName = "orc_use_meta_cache" + withTable(tableName) { + (0 until 10).map(i => (i, i.toString)).toDF("id", "value") + .write.format("orc").saveAsTable(tableName) + try { + val statsBeforeQuery = FileMetaCacheManager.invokePrivate(cacheStatsMethod()) + checkAnswer(sql(s"SELECT id FROM $tableName where id > 5"), + (6 until 10).map(Row.apply(_))) + val statsAfterQuery1 = FileMetaCacheManager.invokePrivate(cacheStatsMethod()) + // The 1st query triggers 4 times file meta read: 2 times related to + // push down filter and 2 times related to file read. The 1st query + // run twice: df.collect() and df.rdd.count(), so it triggers 8 times + // file meta read in total. missCount is 2 because cache is empty and + // 2 meta files need load, other 6 times will read meta from cache. + assert(statsAfterQuery1.missCount() - statsBeforeQuery.missCount() == 2) + assert(statsAfterQuery1.hitCount() - statsBeforeQuery.hitCount() == 6) + checkAnswer(sql(s"SELECT id FROM $tableName where id < 5"), + (0 until 5).map(Row.apply(_))) + val statsAfterQuery2 = FileMetaCacheManager.invokePrivate(cacheStatsMethod()) + // The 2nd query also triggers 8 times file meta read in total and + // all read from meta cache, so missCount no growth and hitCount + // increase 8 times. + assert(statsAfterQuery2.missCount() - statsAfterQuery1.missCount() == 0) + assert(statsAfterQuery2.hitCount() - statsAfterQuery1.hitCount() == 8) + } finally { + FileMetaCacheManager.invokePrivate(cleanUpMethod()) + } + } + } + } +} + +class V1FileMetaCacheSuite extends FileMetaCacheSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_LIST, "orc") +} + +class V2FileMetaCacheSuite extends FileMetaCacheSuite { + override protected def sparkConf: SparkConf = + super + .sparkConf + .set(SQLConf.USE_V1_SOURCE_LIST, "") +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FileMetaCacheReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FileMetaCacheReadBenchmark.scala new file mode 100644 index 0000000000000..6fa57f8f162a5 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FileMetaCacheReadBenchmark.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import scala.util.Random + +import org.apache.spark.SparkConf +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.{DataFrame, DataFrameWriter, Row, SparkSession} +import org.apache.spark.sql.internal.SQLConf + +/** + * Benchmark to measure the performance of fileMetaCache in data source read. + * To run this benchmark: + * {{{ + * 1. without sbt: bin/spark-submit --class + * --jars , + * 2. build/sbt "sql/test:runMain " + * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/FileMetaCacheReadBenchmark-results.txt". + * }}} + */ +object FileMetaCacheReadBenchmark extends SqlBasedBenchmark { + + override def getSparkSession: SparkSession = { + val conf = new SparkConf() + .setAppName("FileMetaCacheReadBenchmark") + // Since `spark.master` always exists, overrides this value + .set("spark.master", "local[1]") + .setIfMissing("spark.driver.memory", "4g") + .setIfMissing("spark.executor.memory", "4g") + + val sparkSession = SparkSession.builder().config(conf).getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + sparkSession.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + sparkSession.conf.set(SQLConf.FILE_META_CACHE_TTL_SINCE_LAST_ACCESS_SEC.key, "5") + + sparkSession + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { + try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + private def prepareTable(dir: File, df: DataFrame, fileCount: Int): Unit = + saveAsOrcTable( df.repartition(fileCount).write, dir.getCanonicalPath + "/orc") + + private def saveAsOrcTable(df: DataFrameWriter[Row], dir: String): Unit = { + df.mode("overwrite").option("compression", "snappy").orc(dir) + spark.read.orc(dir).createOrReplaceTempView("orcTable") + } + + def countBenchmark(values: Int, width: Int, fileCount: Int): Unit = { + val benchmark = new Benchmark( + s"count(*) from $width columns with $fileCount files", + values, + output = output) + + withTempPath { dir => + withTempTable("t1", "orcTable") { + import spark.implicits._ + val selectExpr = (1 to width).map(i => s"value as c$i") + val dataFrame = spark.range(values).map(_ => Random.nextLong).toDF() + dataFrame.selectExpr(selectExpr: _*).createOrReplaceTempView("t1") + + prepareTable(dir, spark.sql("SELECT * FROM t1"), fileCount) + + val filter = { + val rows = + spark.sql(s"SELECT c1, count(*) FROM orcTable group by c1") + .collect() + .sortBy(r => r.getLong(1)) + rows.head.getLong(0) + } + + benchmark.addCase("count(*): fileMetaCacheEnabled = false") { _ => + spark.sql(s"SELECT count(*) FROM orcTable").noop() + } + + benchmark.addCase("count(*): fileMetaCacheEnabled = true") { _ => + withSQLConf(SQLConf.FILE_META_CACHE_ENABLED_SOURCE_LIST.key -> "orc") { + spark.sql(s"SELECT count(*) FROM orcTable").noop() + } + } + + benchmark.addCase("count(*) with Filter: fileMetaCacheEnabled = false") { _ => + spark.sql(s"SELECT count(*) FROM orcTable where c1 = $filter").noop() + } + + benchmark.addCase("count(*) with Filter: fileMetaCacheEnabled = true") { _ => + withSQLConf(SQLConf.FILE_META_CACHE_ENABLED_SOURCE_LIST.key -> "orc") { + spark.sql(s"SELECT count(*) FROM orcTable where c1 = $filter").noop() + } + } + + benchmark.run() + } + } + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + for (fileCount <- List(100, 500, 1000)) { + runBenchmark(s"count(*) From $fileCount files") { + for (columnWidth <- List(10, 50, 100)) { + countBenchmark(1024 * 1024 * 5, columnWidth, fileCount) + } + } + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 13f5778617b5c..bd52c08afda73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -485,4 +485,28 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { |${nonInternalLegacyConfigs.map(_._1).mkString("\n")} |""".stripMargin) } + + test("SPARK-36516: verify the behavior of FILE_META_CACHE_ENABLED_SOURCE_LIST") { + val e1 = intercept[IllegalArgumentException] { + spark.conf.set(SQLConf.FILE_META_CACHE_ENABLED_SOURCE_LIST.key, "text") + } + assert(e1.getMessage.contains("spark.sql.fileMetaCache.enabledSourceList only support")) + + val e2 = intercept[IllegalArgumentException] { + spark.conf.set(SQLConf.FILE_META_CACHE_ENABLED_SOURCE_LIST.key, "text, orc") + } + assert(e2.getMessage.contains("spark.sql.fileMetaCache.enabledSourceList only support")) + + spark.conf.set(SQLConf.FILE_META_CACHE_ENABLED_SOURCE_LIST.key, " ") + assert(!spark.sessionState.conf.fileMetaCacheEnabled("orc")) + + spark.conf.set(SQLConf.FILE_META_CACHE_ENABLED_SOURCE_LIST.key, "orc") + assert(spark.sessionState.conf.fileMetaCacheEnabled("orc")) + + spark.conf.set(SQLConf.FILE_META_CACHE_ENABLED_SOURCE_LIST.key, "") + assert(!spark.sessionState.conf.fileMetaCacheEnabled("orc")) + + spark.conf.unset(SQLConf.FILE_META_CACHE_ENABLED_SOURCE_LIST.key) + assert(!spark.sessionState.conf.fileMetaCacheEnabled("orc")) + } }