Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
import java.net.URI
import java.util.Locale
import java.util.concurrent.Callable
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
Expand Down Expand Up @@ -135,7 +136,16 @@ class SessionCatalog(

private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR. I'm wondering how useful is this cache. The file listing is cached in another place(FileStatusCache), and seems this relation cache doesn't give many benefits. cc @viirya @maropu

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. As you suggested, the most painful part (listed files) has already been cached there. But, it seems some datasources still has somewhat processing costs when resolving a relation (e.g., JDBC datasources send a query to an external database for schema resolution), so I think we need to carefully check performance impacts for removing this cache.

LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For external data sources, it's common that data are changed outside of Spark. I think it's more important to make sure we get the latest data in a new query. Maybe we should disable this relation cache by default.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think this cache is still useful for avoiding inferring schema again. This is also an expensive operation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah that's a good point. We should probably investigate how to design the data source API so that sources don't need to infer schema can skip this cache. It's hard to use the JDBC data source as we need to run REFRESH TABLE (or wait for TTL after this PR) once the table is changed outside of spark (which is common to JDBC source).

val cacheSize = conf.tableRelationCacheSize
CacheBuilder.newBuilder().maximumSize(cacheSize).build[QualifiedTableName, LogicalPlan]()
val cacheTTL = conf.metadataCacheTTL

var builder = CacheBuilder.newBuilder()
.maximumSize(cacheSize)

if (cacheTTL > 0) {
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
}

builder.build[QualifiedTableName, LogicalPlan]()
}

/** This method provides a way to get a cached plan. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3251,6 +3251,8 @@ class SQLConf extends Serializable with Logging {
def legacyAllowCastNumericToTimestamp: Boolean =
getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP)

def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.internal

import java.util.Locale
import java.util.concurrent.TimeUnit

import org.apache.spark.util.Utils

Expand Down Expand Up @@ -226,4 +227,16 @@ object StaticSQLConf {
.version("3.0.0")
.intConf
.createWithDefault(100)

val METADATA_CACHE_TTL_SECONDS = buildStaticConf("spark.sql.metadataCacheTTLSeconds")
.doc("Time-to-live (TTL) value for the metadata caches: partition file metadata cache and " +
"session catalog cache. This configuration only has an effect when this value having " +
"a positive value (> 0). It also requires setting " +
s"${StaticSQLConf.CATALOG_IMPLEMENTATION} to `hive`, setting " +
s"${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE} > 0 and setting " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update the message by using ${SQLConf.HIVE_FILESOURCE_PARTITION_FILE_CACHE_SIZE.key} ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was done in #29194 :)

s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS} to `true` " +
"to be applied to the partition file metadata cache.")
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(-1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.spark.sql.catalyst.catalog

import scala.concurrent.duration._

import org.scalatest.concurrent.Eventually

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.plans.logical.{Command, Range, SubqueryAlias, View}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types._

class InMemorySessionCatalogSuite extends SessionCatalogSuite {
Expand All @@ -45,7 +49,7 @@ class InMemorySessionCatalogSuite extends SessionCatalogSuite {
* signatures but do not extend a common parent. This is largely by design but
* unfortunately leads to very similar test code in two places.
*/
abstract class SessionCatalogSuite extends AnalysisTest {
abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
protected val utils: CatalogTestUtils

protected val isHiveExternalCatalog = false
Expand All @@ -70,6 +74,16 @@ abstract class SessionCatalogSuite extends AnalysisTest {
catalog.reset()
}
}

private def withConfAndEmptyCatalog(conf: SQLConf)(f: SessionCatalog => Unit): Unit = {
val catalog = new SessionCatalog(newEmptyCatalog(), new SimpleFunctionRegistry(), conf)
catalog.createDatabase(newDb("default"), ignoreIfExists = true)
try {
f(catalog)
} finally {
catalog.reset()
}
}
// --------------------------------------------------------------------------
// Databases
// --------------------------------------------------------------------------
Expand Down Expand Up @@ -1641,4 +1655,27 @@ abstract class SessionCatalogSuite extends AnalysisTest {
assert(cause.cause.get.getMessage.contains("Actual error"))
}
}

test("expire table relation cache if TTL is configured") {
case class TestCommand() extends Command

val conf = new SQLConf()
conf.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L)

withConfAndEmptyCatalog(conf) { catalog =>
val table = QualifiedTableName(catalog.getCurrentDatabase, "test")

// First, make sure the test table is not cached.
assert(catalog.getCachedTable(table) === null)

catalog.cacheTable(table, TestCommand())
assert(catalog.getCachedTable(table) !== null)

// Wait until the cache expiration.
eventually(timeout(3.seconds)) {
// And the cache is gone.
assert(catalog.getCachedTable(table) === null)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -44,7 +45,9 @@ object FileStatusCache {
session.sqlContext.conf.filesourcePartitionFileCacheSize > 0) {
if (sharedCache == null) {
sharedCache = new SharedInMemoryCache(
session.sqlContext.conf.filesourcePartitionFileCacheSize)
session.sqlContext.conf.filesourcePartitionFileCacheSize,
session.sqlContext.conf.metadataCacheTTL
)
}
sharedCache.createForNewClient()
} else {
Expand Down Expand Up @@ -89,7 +92,7 @@ abstract class FileStatusCache {
*
* @param maxSizeInBytes max allowable cache size before entries start getting evicted
*/
private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends Logging {

// Opaque object that uniquely identifies a shared cache user
private type ClientId = Object
Expand Down Expand Up @@ -129,11 +132,17 @@ private class SharedInMemoryCache(maxSizeInBytes: Long) extends Logging {
}
}
}
CacheBuilder.newBuilder()

var builder = CacheBuilder.newBuilder()
.weigher(weigher)
.removalListener(removalListener)
.maximumWeight(maxSizeInBytes / weightScale)
.build[(ClientId, Path), Array[FileStatus]]()

if (cacheTTL > 0) {
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
}

builder.build[(ClientId, Path), Array[FileStatus]]()
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{File, FileNotFoundException}
import java.net.URI

import scala.collection.mutable
import scala.concurrent.duration._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path, RawLocalFileSystem, RemoteIterator}
Expand All @@ -33,7 +34,7 @@ import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.util.KnownSizeEstimation
Expand Down Expand Up @@ -488,6 +489,31 @@ class FileIndexSuite extends SharedSparkSession {
val fileIndex = new TestInMemoryFileIndex(spark, path)
assert(fileIndex.leafFileStatuses.toSeq == statuses)
}

test("expire FileStatusCache if TTL is configured") {
val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS)
try {
// using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime
SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L)

val path = new Path("/dummy_tmp", "abc")
val files = (1 to 3).map(_ => new FileStatus())

FileStatusCache.resetForTesting()
val fileStatusCache = FileStatusCache.getOrCreate(spark)
fileStatusCache.putLeafFiles(path, files.toArray)

// Exactly 3 files are cached.
assert(fileStatusCache.getLeafFiles(path).get.length === 3)
// Wait until the cache expiration.
eventually(timeout(3.seconds)) {
// And the cache is gone.
assert(fileStatusCache.getLeafFiles(path).isEmpty === true)
}
} finally {
SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue)
}
}
}

object DeletionRaceFileSystem {
Expand Down