diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b79857cdccd22..2b3f05f61b483 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog import java.net.URI import java.util.Locale import java.util.concurrent.Callable +import java.util.concurrent.TimeUnit import javax.annotation.concurrent.GuardedBy import scala.collection.mutable @@ -135,7 +136,16 @@ class SessionCatalog( private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = { val cacheSize = conf.tableRelationCacheSize - CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]() + val cacheTTL = conf.metadataCacheTTL + + var builder = CacheBuilder.newBuilder() + .maximumSize(cacheSize) + + if (cacheTTL > 0) { + builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) + } + + builder.build[QualifiedTableName, LogicalPlan]() } /** This method provides a way to get a cached plan. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3149d14c1ddcc..90dc0b523908e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3251,6 +3251,8 @@ class SQLConf extends Serializable with Logging { def legacyAllowCastNumericToTimestamp: Boolean = getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP) + def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 9618ff6062635..9bc910b5e1cdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.internal import java.util.Locale +import java.util.concurrent.TimeUnit import org.apache.spark.util.Utils @@ -226,4 +227,16 @@ object StaticSQLConf { .version("3.0.0") .intConf .createWithDefault(100) + + val METADATA_CACHE_TTL_SECONDS = buildStaticConf("spark.sql.metadataCacheTTLSeconds") + .doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " + + "session catalog cache. This configuration only has an effect when this value having " + + "a positive value (> 0). It also requires setting " + + s"${StaticSQLConf.CATALOG_IMPLEMENTATION} to `hive`, setting " + + s"${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE} > 0 and setting " + + s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS} to `true` " + + "to be applied to the partition file metadata cache.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(-1) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 4d88a8d7ee546..ad40cc010361c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -17,15 +17,19 @@ package org.apache.spark.sql.catalyst.catalog +import scala.concurrent.duration._ + +import org.scalatest.concurrent.Eventually + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} +import org.apache.spark.sql.catalyst.plans.logical.{Command, Range, SubqueryAlias, View} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types._ class InMemorySessionCatalogSuite extends SessionCatalogSuite { @@ -45,7 +49,7 @@ class InMemorySessionCatalogSuite extends SessionCatalogSuite { * signatures but do not extend a common parent. This is largely by design but * unfortunately leads to very similar test code in two places. */ -abstract class SessionCatalogSuite extends AnalysisTest { +abstract class SessionCatalogSuite extends AnalysisTest with Eventually { protected val utils: CatalogTestUtils protected val isHiveExternalCatalog = false @@ -70,6 +74,16 @@ abstract class SessionCatalogSuite extends AnalysisTest { catalog.reset() } } + + private def withConfAndEmptyCatalog(conf: SQLConf)(f: SessionCatalog => Unit): Unit = { + val catalog = new SessionCatalog(newEmptyCatalog(), new SimpleFunctionRegistry(), conf) + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + try { + f(catalog) + } finally { + catalog.reset() + } + } // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -1641,4 +1655,27 @@ abstract class SessionCatalogSuite extends AnalysisTest { assert(cause.cause.get.getMessage.contains("Actual error")) } } + + test("expire table relation cache if TTL is configured") { + case class TestCommand() extends Command + + val conf = new SQLConf() + conf.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) + + withConfAndEmptyCatalog(conf) { catalog => + val table = QualifiedTableName(catalog.getCurrentDatabase, "test") + + // First, make sure the test table is not cached. + assert(catalog.getCachedTable(table) === null) + + catalog.cacheTable(table, TestCommand()) + assert(catalog.getCachedTable(table) !== null) + + // Wait until the cache expiration. + eventually(timeout(3.seconds)) { + // And the cache is gone. + assert(catalog.getCachedTable(table) === null) + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala index aea27bd4c4d7f..b5d800f02862e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ @@ -44,7 +45,9 @@ object FileStatusCache { session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) { if (sharedCache == null) { sharedCache = new SharedInMemoryCache( - session.sqlContext.conf.filesourcePartitionFileCacheSize) + session.sqlContext.conf.filesourcePartitionFileCacheSize, + session.sqlContext.conf.metadataCacheTTL + ) } sharedCache.createForNewClient() } else { @@ -89,7 +92,7 @@ abstract class FileStatusCache { * * @param maxSizeInBytes max allowable cache size before entries start getting evicted */ -private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging { +private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends Logging { // Opaque object that uniquely identifies a shared cache user private type ClientId = Object @@ -129,11 +132,17 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging { } } } - CacheBuilder.newBuilder() + + var builder = CacheBuilder.newBuilder() .weigher(weigher) .removalListener(removalListener) .maximumWeight(maxSizeInBytes / weightScale) - .build[(ClientId, Path), Array[FileStatus]]() + + if (cacheTTL > 0) { + builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) + } + + builder.build[(ClientId, Path), Array[FileStatus]]() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index ea15f1891b006..040996276063b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -21,6 +21,7 @@ import java.io.{File, FileNotFoundException} import java.net.URI import scala.collection.mutable +import scala.concurrent.duration._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem, RemoteIterator} @@ -33,7 +34,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.functions.col -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.KnownSizeEstimation @@ -488,6 +489,31 @@ class FileIndexSuite extends SharedSparkSession { val fileIndex = new TestInMemoryFileIndex(spark, path) assert(fileIndex.leafFileStatuses.toSeq == statuses) } + + test("expire FileStatusCache if TTL is configured") { + val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + try { + // using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) + + val path = new Path("/dummy_tmp", "abc") + val files = (1 to 3).map(_ => new FileStatus()) + + FileStatusCache.resetForTesting() + val fileStatusCache = FileStatusCache.getOrCreate(spark) + fileStatusCache.putLeafFiles(path, files.toArray) + + // Exactly 3 files are cached. + assert(fileStatusCache.getLeafFiles(path).get.length === 3) + // Wait until the cache expiration. + eventually(timeout(3.seconds)) { + // And the cache is gone. + assert(fileStatusCache.getLeafFiles(path).isEmpty === true) + } + } finally { + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) + } + } } object DeletionRaceFileSystem {