From 65e4ebd38af17d8798b2029fa85d42f7cb1ca47d Mon Sep 17 00:00:00 2001 From: Yaroslav Tkachenko Date: Wed, 17 Jun 2020 14:00:21 -0700 Subject: [PATCH 1/8] [SPARK-30616][SQL] Introduce TTL config option for SQL Metadata Cache --- .../org/apache/spark/sql/internal/SQLConf.scala | 11 +++++++++++ .../execution/datasources/FileStatusCache.scala | 17 +++++++++++++---- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3149d14c1ddcc..11e49e2ef59ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -835,6 +835,15 @@ object SQLConf { .longConf .createWithDefault(250 * 1024 * 1024) + val HIVE_FILESOURCE_PARTITION_FILE_CACHE_TTL = + buildConf("spark.sql.hive.filesourcePartitionFileCacheTTL") + .doc("When positive, it's used as a TTL (time-to-live) value for the partition file " + + "metadata cache. This conf only has an effect when hive filesource partition management " + + "is enabled.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(-1) + object HiveCaseSensitiveInferenceMode extends Enumeration { val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value } @@ -2900,6 +2909,8 @@ class SQLConf extends Serializable with Logging { def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE) + def filesourcePartitionFileCacheTTL: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_TTL) + def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value = HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala index aea27bd4c4d7f..c1bad66a2f748 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ @@ -44,7 +45,9 @@ object FileStatusCache { session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) { if (sharedCache == null) { sharedCache = new SharedInMemoryCache( - session.sqlContext.conf.filesourcePartitionFileCacheSize) + session.sqlContext.conf.filesourcePartitionFileCacheSize, + session.sqlContext.conf.filesourcePartitionFileCacheTTL + ) } sharedCache.createForNewClient() } else { @@ -89,7 +92,7 @@ abstract class FileStatusCache { * * @param maxSizeInBytes max allowable cache size before entries start getting evicted */ -private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging { +private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends Logging { // Opaque object that uniquely identifies a shared cache user private type ClientId = Object @@ -129,11 +132,17 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging { } } } - CacheBuilder.newBuilder() + + var builder = CacheBuilder.newBuilder() .weigher(weigher) .removalListener(removalListener) .maximumWeight(maxSizeInBytes / weightScale) - .build[(ClientId, Path), Array[FileStatus]]() + + if (cacheTTL > 0) { + builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) + } + + builder.build[(ClientId, Path), Array[FileStatus]]() } From ea7bf0f0c47279689bb3bd187157d6e90bc2e8d2 Mon Sep 17 00:00:00 2001 From: Yaroslav Tkachenko Date: Thu, 18 Jun 2020 09:35:40 -0700 Subject: [PATCH 2/8] [SPARK-30616][SQL] Update doc string --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 11e49e2ef59ed..6a6853253f947 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -837,9 +837,9 @@ object SQLConf { val HIVE_FILESOURCE_PARTITION_FILE_CACHE_TTL = buildConf("spark.sql.hive.filesourcePartitionFileCacheTTL") - .doc("When positive, it's used as a TTL (time-to-live) value for the partition file " + - "metadata cache. This conf only has an effect when hive filesource partition management " + - "is enabled.") + .doc("Time-to-live (TTL) value for the partition file metadata cache. This configuration " + + "only has an effect when this value having a positive value and setting `hive` to " + + s"${StaticSQLConf.CATALOG_IMPLEMENTATION}.") .version("3.1.0") .timeConf(TimeUnit.SECONDS) .createWithDefault(-1) From 28da5cfd39dba6a8319dd1cdfe39e51ed5cbdea5 Mon Sep 17 00:00:00 2001 From: Yaroslav Tkachenko Date: Wed, 24 Jun 2020 16:18:14 -0700 Subject: [PATCH 3/8] [SPARK-30616][SQL] Make a common cache TTL option + add tests --- .../sql/catalyst/catalog/SessionCatalog.scala | 12 +++++- .../apache/spark/sql/internal/SQLConf.scala | 24 ++++++----- .../datasources/FileStatusCache.scala | 2 +- .../datasources/FileIndexSuite.scala | 21 ++++++++++ .../sql/hive/HiveMetadataCacheSuite.scala | 42 ++++++++++++++++++- 5 files changed, 86 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b79857cdccd22..2b3f05f61b483 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog import java.net.URI import java.util.Locale import java.util.concurrent.Callable +import java.util.concurrent.TimeUnit import javax.annotation.concurrent.GuardedBy import scala.collection.mutable @@ -135,7 +136,16 @@ class SessionCatalog( private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = { val cacheSize = conf.tableRelationCacheSize - CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]() + val cacheTTL = conf.metadataCacheTTL + + var builder = CacheBuilder.newBuilder() + .maximumSize(cacheSize) + + if (cacheTTL > 0) { + builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) + } + + builder.build[QualifiedTableName, LogicalPlan]() } /** This method provides a way to get a cached plan. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6a6853253f947..6f8744bbcab9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -835,15 +835,6 @@ object SQLConf { .longConf .createWithDefault(250 * 1024 * 1024) - val HIVE_FILESOURCE_PARTITION_FILE_CACHE_TTL = - buildConf("spark.sql.hive.filesourcePartitionFileCacheTTL") - .doc("Time-to-live (TTL) value for the partition file metadata cache. This configuration " + - "only has an effect when this value having a positive value and setting `hive` to " + - s"${StaticSQLConf.CATALOG_IMPLEMENTATION}.") - .version("3.1.0") - .timeConf(TimeUnit.SECONDS) - .createWithDefault(-1) - object HiveCaseSensitiveInferenceMode extends Enumeration { val INFER_AND_SAVE, INFER_ONLY, NEVER_INFER = Value } @@ -2665,6 +2656,17 @@ object SQLConf { .checkValue(_ > 0, "The difference must be positive.") .createWithDefault(4) + val METADATA_CACHE_TTL = + buildConf("spark.sql.metadataCacheTTL") + .doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " + + "session catalog cache. This configuration only has an effect when this value having " + + "a positive value. It also requires setting `hive` to " + + s"${StaticSQLConf.CATALOG_IMPLEMENTATION} to be applied to the partition file " + + "metadata cache.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(-1) + /** * Holds information about keys that have been deprecated. * @@ -2909,8 +2911,6 @@ class SQLConf extends Serializable with Logging { def filesourcePartitionFileCacheSize: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE) - def filesourcePartitionFileCacheTTL: Long = getConf(HIVE_FILESOURCE_PARTITION_FILE_CACHE_TTL) - def caseSensitiveInferenceMode: HiveCaseSensitiveInferenceMode.Value = HiveCaseSensitiveInferenceMode.withName(getConf(HIVE_CASE_SENSITIVE_INFERENCE)) @@ -3262,6 +3262,8 @@ class SQLConf extends Serializable with Logging { def legacyAllowCastNumericToTimestamp: Boolean = getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP) + def metadataCacheTTL: Long = getConf(METADATA_CACHE_TTL) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala index c1bad66a2f748..b5d800f02862e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala @@ -46,7 +46,7 @@ object FileStatusCache { if (sharedCache == null) { sharedCache = new SharedInMemoryCache( session.sqlContext.conf.filesourcePartitionFileCacheSize, - session.sqlContext.conf.filesourcePartitionFileCacheTTL + session.sqlContext.conf.metadataCacheTTL ) } sharedCache.createForNewClient() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index ea15f1891b006..f92a09bae625c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -30,6 +30,7 @@ import org.mockito.Mockito.{mock, when} import org.apache.spark.SparkException import org.apache.spark.metrics.source.HiveCatalogMetrics +import org.apache.spark.sql.LocalSparkSession.withSparkSession import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.functions.col @@ -488,6 +489,26 @@ class FileIndexSuite extends SharedSparkSession { val fileIndex = new TestInMemoryFileIndex(spark, path) assert(fileIndex.leafFileStatuses.toSeq == statuses) } + + test("expire FileStatusCache if TTL is configured") { + val sparkConfWithTTl = sparkConf.set(SQLConf.METADATA_CACHE_TTL.key, "1") + + withSparkSession(SparkSession.builder.config(sparkConfWithTTl).getOrCreate()) { spark => + val path = new Path("/tmp", "abc") + val files = (1 to 3).map(_ => new FileStatus()) + + FileStatusCache.resetForTesting() + val fileStatusCache = FileStatusCache.getOrCreate(spark) + fileStatusCache.putLeafFiles(path, files.toArray) + + // Exactly 3 files are cached. + assert(fileStatusCache.getLeafFiles(path).get.length === 3) + // Wait until the cache expiration. + Thread.sleep(1500) // 1.5 seconds > 1 second + // And the cache is gone. + assert(fileStatusCache.getLeafFiles(path) === None) + } + } } object DeletionRaceFileSystem { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index db8ebcd45f3eb..1fec3e01b6067 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -19,8 +19,11 @@ package org.apache.spark.sql.hive import org.apache.hadoop.fs.Path -import org.apache.spark.SparkException -import org.apache.spark.sql.QueryTest +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.{QueryTest, SparkSession} +import org.apache.spark.sql.LocalSparkSession.withSparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -126,4 +129,39 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi for (pruningEnabled <- Seq(true, false)) { testCaching(pruningEnabled) } + + test("cache TTL") { + val sparkConfWithTTl = new SparkConf().set(SQLConf.METADATA_CACHE_TTL.key, "1") + val newSession = SparkSession.builder.config(sparkConfWithTTl).getOrCreate().cloneSession() + + withSparkSession(newSession) { implicit spark => + withTable("test_ttl") { + withTempDir { dir => + spark.sql(s""" + |create external table test_ttl (id long) + |partitioned by (f1 int, f2 int) + |stored as parquet + |location "${dir.toURI}"""".stripMargin) + + val tableIdentifier = TableIdentifier("test_ttl", Some("default")) + + // First, make sure the test table is not cached. + assert(getCachedDataSourceTable(tableIdentifier) === null) + // This query will make the table cached. + spark.sql("select * from test_ttl") + assert(getCachedDataSourceTable(tableIdentifier) !== null) + // Wait until the cache expiration. + Thread.sleep(1500L) // 1.5 seconds > 1 second. + // And the cache is gone. + assert(getCachedDataSourceTable(tableIdentifier) === null) + } + } + } + } + + private def getCachedDataSourceTable(table: TableIdentifier) + (implicit spark: SparkSession): LogicalPlan = { + spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog + .getCachedDataSourceTable(table) + } } From 7d94f943496fb463f18cd445ccbde3f5225c8e7b Mon Sep 17 00:00:00 2001 From: Yaroslav Tkachenko Date: Thu, 25 Jun 2020 10:36:17 -0700 Subject: [PATCH 4/8] [SPARK-30616][SQL] Addressing code review feedback --- .../apache/spark/sql/internal/SQLConf.scala | 3 +-- .../datasources/FileIndexSuite.scala | 8 ++++--- .../sql/hive/HiveMetadataCacheSuite.scala | 21 +++++++++++-------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6f8744bbcab9c..f8d42e2be4e90 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2656,8 +2656,7 @@ object SQLConf { .checkValue(_ > 0, "The difference must be positive.") .createWithDefault(4) - val METADATA_CACHE_TTL = - buildConf("spark.sql.metadataCacheTTL") + val METADATA_CACHE_TTL = buildConf("spark.sql.metadataCacheTTL") .doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " + "session catalog cache. This configuration only has an effect when this value having " + "a positive value. It also requires setting `hive` to " + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index f92a09bae625c..2d65d300576ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -21,6 +21,7 @@ import java.io.{File, FileNotFoundException} import java.net.URI import scala.collection.mutable +import scala.concurrent.duration._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem, RemoteIterator} @@ -504,9 +505,10 @@ class FileIndexSuite extends SharedSparkSession { // Exactly 3 files are cached. assert(fileStatusCache.getLeafFiles(path).get.length === 3) // Wait until the cache expiration. - Thread.sleep(1500) // 1.5 seconds > 1 second - // And the cache is gone. - assert(fileStatusCache.getLeafFiles(path) === None) + eventually(timeout(2.seconds)) { + // And the cache is gone. + assert(fileStatusCache.getLeafFiles(path).isEmpty === true) + } } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index 1fec3e01b6067..9aeb7019235eb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import scala.concurrent.duration._ + import org.apache.hadoop.fs.Path import org.apache.spark.{SparkConf, SparkException} @@ -130,7 +132,7 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi testCaching(pruningEnabled) } - test("cache TTL") { + test("expire cached metadata if TTL is configured") { val sparkConfWithTTl = new SparkConf().set(SQLConf.METADATA_CACHE_TTL.key, "1") val newSession = SparkSession.builder.config(sparkConfWithTTl).getOrCreate().cloneSession() @@ -138,10 +140,10 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi withTable("test_ttl") { withTempDir { dir => spark.sql(s""" - |create external table test_ttl (id long) - |partitioned by (f1 int, f2 int) - |stored as parquet - |location "${dir.toURI}"""".stripMargin) + |CREATE EXTERNAL TABLE test_ttl (id long) + |PARTITIONED BY (f1 int, f2 int) + |STORED AS PARQUET + |LOCATION "${dir.toURI}"""".stripMargin) val tableIdentifier = TableIdentifier("test_ttl", Some("default")) @@ -151,16 +153,17 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi spark.sql("select * from test_ttl") assert(getCachedDataSourceTable(tableIdentifier) !== null) // Wait until the cache expiration. - Thread.sleep(1500L) // 1.5 seconds > 1 second. - // And the cache is gone. - assert(getCachedDataSourceTable(tableIdentifier) === null) + eventually(timeout(2.seconds)) { + // And the cache is gone. + assert(getCachedDataSourceTable(tableIdentifier) === null) + } } } } } private def getCachedDataSourceTable(table: TableIdentifier) - (implicit spark: SparkSession): LogicalPlan = { + (implicit spark: SparkSession): LogicalPlan = { spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog .getCachedDataSourceTable(table) } From ec18c335cd5d2e038766f65b4109e170290afb99 Mon Sep 17 00:00:00 2001 From: Yaroslav Tkachenko Date: Thu, 25 Jun 2020 10:47:54 -0700 Subject: [PATCH 5/8] [SPARK-30616][SQL] Addressing code review feedback --- .../spark/sql/execution/datasources/FileIndexSuite.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 2d65d300576ee..a82ea4305b908 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -31,7 +31,6 @@ import org.mockito.Mockito.{mock, when} import org.apache.spark.SparkException import org.apache.spark.metrics.source.HiveCatalogMetrics -import org.apache.spark.sql.LocalSparkSession.withSparkSession import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.functions.col @@ -492,9 +491,7 @@ class FileIndexSuite extends SharedSparkSession { } test("expire FileStatusCache if TTL is configured") { - val sparkConfWithTTl = sparkConf.set(SQLConf.METADATA_CACHE_TTL.key, "1") - - withSparkSession(SparkSession.builder.config(sparkConfWithTTl).getOrCreate()) { spark => + withSQLConf(SQLConf.METADATA_CACHE_TTL.key -> "1") { val path = new Path("/tmp", "abc") val files = (1 to 3).map(_ => new FileStatus()) From 18feeb02b11407b9eea1457139a91cb40ddc40cf Mon Sep 17 00:00:00 2001 From: Yaroslav Tkachenko Date: Sun, 28 Jun 2020 22:23:25 -0700 Subject: [PATCH 6/8] [SPARK-30616][SQL] Addressing code review feedback --- .../apache/spark/sql/internal/SQLConf.scala | 12 +---- .../spark/sql/internal/StaticSQLConf.scala | 13 ++++++ .../catalog/SessionCatalogSuite.scala | 45 +++++++++++++++++-- .../datasources/FileIndexSuite.scala | 13 ++++-- .../sql/hive/HiveMetadataCacheSuite.scala | 45 +------------------ 5 files changed, 66 insertions(+), 62 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f8d42e2be4e90..b180ade88b6af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2656,16 +2656,6 @@ object SQLConf { .checkValue(_ > 0, "The difference must be positive.") .createWithDefault(4) - val METADATA_CACHE_TTL = buildConf("spark.sql.metadataCacheTTL") - .doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " + - "session catalog cache. This configuration only has an effect when this value having " + - "a positive value. It also requires setting `hive` to " + - s"${StaticSQLConf.CATALOG_IMPLEMENTATION} to be applied to the partition file " + - "metadata cache.") - .version("3.1.0") - .timeConf(TimeUnit.SECONDS) - .createWithDefault(-1) - /** * Holds information about keys that have been deprecated. * @@ -3261,7 +3251,7 @@ class SQLConf extends Serializable with Logging { def legacyAllowCastNumericToTimestamp: Boolean = getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP) - def metadataCacheTTL: Long = getConf(METADATA_CACHE_TTL) + def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 9618ff6062635..df4c67a427e38 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.internal import java.util.Locale +import java.util.concurrent.TimeUnit import org.apache.spark.util.Utils @@ -226,4 +227,16 @@ object StaticSQLConf { .version("3.0.0") .intConf .createWithDefault(100) + + val METADATA_CACHE_TTL = buildStaticConf("spark.sql.metadataCacheTTL") + .doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " + + "session catalog cache. This configuration only has an effect when this value having " + + "a positive value (> 0). It also requires setting " + + s"${StaticSQLConf.CATALOG_IMPLEMENTATION} to `hive`, setting " + + s"${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE} > 0 and setting " + + s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS} to `true` " + + s"to be applied to the partition file metadata cache.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefault(-1) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 4d88a8d7ee546..a31712428df8b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -17,15 +17,19 @@ package org.apache.spark.sql.catalyst.catalog +import scala.concurrent.duration._ + +import org.scalatest.concurrent.Eventually + import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} +import org.apache.spark.sql.catalyst.plans.logical.{Command, Range, SubqueryAlias, View} import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types._ class InMemorySessionCatalogSuite extends SessionCatalogSuite { @@ -45,7 +49,7 @@ class InMemorySessionCatalogSuite extends SessionCatalogSuite { * signatures but do not extend a common parent. This is largely by design but * unfortunately leads to very similar test code in two places. */ -abstract class SessionCatalogSuite extends AnalysisTest { +abstract class SessionCatalogSuite extends AnalysisTest with Eventually { protected val utils: CatalogTestUtils protected val isHiveExternalCatalog = false @@ -70,6 +74,16 @@ abstract class SessionCatalogSuite extends AnalysisTest { catalog.reset() } } + + private def withConfAndEmptyCatalog(conf: SQLConf)(f: SessionCatalog => Unit): Unit = { + val catalog = new SessionCatalog(newEmptyCatalog(), new SimpleFunctionRegistry(), conf) + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + try { + f(catalog) + } finally { + catalog.reset() + } + } // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -1641,4 +1655,27 @@ abstract class SessionCatalogSuite extends AnalysisTest { assert(cause.cause.get.getMessage.contains("Actual error")) } } + + test("expire table relation cache if TTL is configured") { + case class TestCommand() extends Command + + val conf = new SQLConf() + conf.setConf(StaticSQLConf.METADATA_CACHE_TTL, 1L) + + withConfAndEmptyCatalog(conf) { catalog => + val table = QualifiedTableName(catalog.getCurrentDatabase, "test") + + // First, make sure the test table is not cached. + assert(catalog.getCachedTable(table) === null) + + catalog.cacheTable(table, TestCommand()) + assert(catalog.getCachedTable(table) !== null) + + // Wait until the cache expiration. + eventually(timeout(3.seconds)) { + // And the cache is gone. + assert(catalog.getCachedTable(table) === null) + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index a82ea4305b908..d4d1abe030456 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.functions.col -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.KnownSizeEstimation @@ -491,8 +491,11 @@ class FileIndexSuite extends SharedSparkSession { } test("expire FileStatusCache if TTL is configured") { - withSQLConf(SQLConf.METADATA_CACHE_TTL.key -> "1") { - val path = new Path("/tmp", "abc") + val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL) + try { + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL, 1L) + + val path = new Path("/dummy_tmp", "abc") val files = (1 to 3).map(_ => new FileStatus()) FileStatusCache.resetForTesting() @@ -502,10 +505,12 @@ class FileIndexSuite extends SharedSparkSession { // Exactly 3 files are cached. assert(fileStatusCache.getLeafFiles(path).get.length === 3) // Wait until the cache expiration. - eventually(timeout(2.seconds)) { + eventually(timeout(3.seconds)) { // And the cache is gone. assert(fileStatusCache.getLeafFiles(path).isEmpty === true) } + } finally { + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL, previousValue) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala index 9aeb7019235eb..db8ebcd45f3eb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala @@ -17,15 +17,10 @@ package org.apache.spark.sql.hive -import scala.concurrent.duration._ - import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.sql.{QueryTest, SparkSession} -import org.apache.spark.sql.LocalSparkSession.withSparkSession -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.SparkException +import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils @@ -131,40 +126,4 @@ class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSi for (pruningEnabled <- Seq(true, false)) { testCaching(pruningEnabled) } - - test("expire cached metadata if TTL is configured") { - val sparkConfWithTTl = new SparkConf().set(SQLConf.METADATA_CACHE_TTL.key, "1") - val newSession = SparkSession.builder.config(sparkConfWithTTl).getOrCreate().cloneSession() - - withSparkSession(newSession) { implicit spark => - withTable("test_ttl") { - withTempDir { dir => - spark.sql(s""" - |CREATE EXTERNAL TABLE test_ttl (id long) - |PARTITIONED BY (f1 int, f2 int) - |STORED AS PARQUET - |LOCATION "${dir.toURI}"""".stripMargin) - - val tableIdentifier = TableIdentifier("test_ttl", Some("default")) - - // First, make sure the test table is not cached. - assert(getCachedDataSourceTable(tableIdentifier) === null) - // This query will make the table cached. - spark.sql("select * from test_ttl") - assert(getCachedDataSourceTable(tableIdentifier) !== null) - // Wait until the cache expiration. - eventually(timeout(2.seconds)) { - // And the cache is gone. - assert(getCachedDataSourceTable(tableIdentifier) === null) - } - } - } - } - } - - private def getCachedDataSourceTable(table: TableIdentifier) - (implicit spark: SparkSession): LogicalPlan = { - spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog - .getCachedDataSourceTable(table) - } } From 1d5248e1df9e0e9068fbec60ef6fcc7639c4a4a7 Mon Sep 17 00:00:00 2001 From: Yaroslav Tkachenko Date: Tue, 30 Jun 2020 09:36:12 -0700 Subject: [PATCH 7/8] [SPARK-30616][SQL] Addressing code review feedback --- .../scala/org/apache/spark/sql/internal/StaticSQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index df4c67a427e38..ac99a8249fc3a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -235,7 +235,7 @@ object StaticSQLConf { s"${StaticSQLConf.CATALOG_IMPLEMENTATION} to `hive`, setting " + s"${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE} > 0 and setting " + s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS} to `true` " + - s"to be applied to the partition file metadata cache.") + "to be applied to the partition file metadata cache.") .version("3.1.0") .timeConf(TimeUnit.SECONDS) .createWithDefault(-1) From 3e761dcd790b9c30e5cee7bffe916dfc2c82b7a5 Mon Sep 17 00:00:00 2001 From: Yaroslav Tkachenko Date: Thu, 16 Jul 2020 21:15:45 -0700 Subject: [PATCH 8/8] [SPARK-30616][SQL] Addressing code review feedback --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- .../org/apache/spark/sql/internal/StaticSQLConf.scala | 2 +- .../spark/sql/catalyst/catalog/SessionCatalogSuite.scala | 2 +- .../spark/sql/execution/datasources/FileIndexSuite.scala | 7 ++++--- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b180ade88b6af..90dc0b523908e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3251,7 +3251,7 @@ class SQLConf extends Serializable with Logging { def legacyAllowCastNumericToTimestamp: Boolean = getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP) - def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL) + def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) /** ********************** SQLConf functionality methods ************ */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index ac99a8249fc3a..9bc910b5e1cdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -228,7 +228,7 @@ object StaticSQLConf { .intConf .createWithDefault(100) - val METADATA_CACHE_TTL = buildStaticConf("spark.sql.metadataCacheTTL") + val METADATA_CACHE_TTL_SECONDS = buildStaticConf("spark.sql.metadataCacheTTLSeconds") .doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " + "session catalog cache. This configuration only has an effect when this value having " + "a positive value (> 0). It also requires setting " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index a31712428df8b..ad40cc010361c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -1660,7 +1660,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { case class TestCommand() extends Command val conf = new SQLConf() - conf.setConf(StaticSQLConf.METADATA_CACHE_TTL, 1L) + conf.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) withConfAndEmptyCatalog(conf) { catalog => val table = QualifiedTableName(catalog.getCurrentDatabase, "test") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index d4d1abe030456..040996276063b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -491,9 +491,10 @@ class FileIndexSuite extends SharedSparkSession { } test("expire FileStatusCache if TTL is configured") { - val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL) + val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) try { - SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL, 1L) + // using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) val path = new Path("/dummy_tmp", "abc") val files = (1 to 3).map(_ => new FileStatus()) @@ -510,7 +511,7 @@ class FileIndexSuite extends SharedSparkSession { assert(fileStatusCache.getLeafFiles(path).isEmpty === true) } } finally { - SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL, previousValue) + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) } } }