Skip to content

Commit e35cd3f

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7516][CARMEL-5431] Limit the cached FileStatus size in driver memory (apache#163)
1 parent b6b76e1 commit e35cd3f

File tree

6 files changed

+116
-14
lines changed

6 files changed

+116
-14
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import javax.annotation.concurrent.GuardedBy
2626
import scala.collection.mutable
2727
import scala.util.{Failure, Success, Try}
2828

29-
import com.google.common.cache.{Cache, CacheBuilder}
29+
import com.google.common.cache.{Cache, CacheBuilder, RemovalCause, RemovalListener, RemovalNotification, Weigher}
3030
import org.apache.hadoop.conf.Configuration
3131
import org.apache.hadoop.fs.Path
3232

@@ -72,6 +72,7 @@ class SessionCatalog(
7272
functionResourceLoader: FunctionResourceLoader,
7373
functionExpressionBuilder: FunctionExpressionBuilder,
7474
cacheSize: Int = SQLConf.get.tableRelationCacheSize,
75+
cacheMemorySizeInBytes: Int = SQLConf.get.tableRelationCacheMemorySizeInBytes,
7576
cacheTTL: Long = SQLConf.get.metadataCacheTTL,
7677
defaultDatabase: String = SQLConf.get.defaultDatabase,
7778
scratchSessionPath: Option[Path] = None,
@@ -95,6 +96,7 @@ class SessionCatalog(
9596
DummyFunctionResourceLoader,
9697
DummyFunctionExpressionBuilder,
9798
conf.tableRelationCacheSize,
99+
conf.tableRelationCacheMemorySizeInBytes,
98100
conf.metadataCacheTTL,
99101
conf.defaultDatabase)
100102
}
@@ -152,6 +154,8 @@ class SessionCatalog(
152154

153155
private val validNameFormat = "([\\w_]+)".r
154156

157+
def defaultPlanCacheSize: Int = cacheMemorySizeInBytes / cacheSize
158+
155159
/**
156160
* Checks if the given name conforms the Hive standard ("[a-zA-Z_0-9]+"),
157161
* i.e. if this name only contains characters, numbers, and _.
@@ -210,33 +214,61 @@ class SessionCatalog(
210214
}
211215
}
212216

213-
private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
217+
private val tableRelationCache: Cache[QualifiedTableName, (LogicalPlan, Int)] = {
214218
var builder = CacheBuilder.newBuilder()
215-
.maximumSize(cacheSize)
219+
220+
if (cacheMemorySizeInBytes > 0) {
221+
val weigher = new Weigher[QualifiedTableName, (LogicalPlan, Int)] {
222+
override def weigh(key: QualifiedTableName, value: (LogicalPlan, Int)): Int = {
223+
value._2
224+
}
225+
}
226+
227+
val removeListener = new RemovalListener[QualifiedTableName, (LogicalPlan, Int)]() {
228+
override def onRemoval(
229+
removed: RemovalNotification[QualifiedTableName, (LogicalPlan, Int)]): Unit = {
230+
if (removed.getCause == RemovalCause.SIZE) {
231+
logWarning(
232+
"Evicting session cache relation from memory due to size constraints " +
233+
"(spark.sql.hive.filesourcePartitionFileCacheSize = "
234+
+ cacheMemorySizeInBytes + " bytes)." +
235+
" This may impact following planning performance.")
236+
}
237+
}
238+
}
239+
builder.weigher(weigher)
240+
.removalListener(removeListener)
241+
.maximumWeight(cacheMemorySizeInBytes)
242+
} else {
243+
builder.maximumSize(cacheSize)
244+
}
216245

217246
if (cacheTTL > 0) {
218247
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
219248
}
220249

221-
builder.build[QualifiedTableName, LogicalPlan]()
250+
builder.build[QualifiedTableName, (LogicalPlan, Int)]()
222251
}
223252

224253
private val listener = new ExternalCatalogListener(this)
225254
liveBus.map(_.addToExternalCatalogQueue(listener))
226255

227256
/** This method provides a way to get a cached plan. */
228-
def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {
229-
tableRelationCache.get(t, c)
257+
def getCachedPlan(t: QualifiedTableName, c: Callable[(LogicalPlan, Int)]): LogicalPlan = {
258+
tableRelationCache.get(t, c)._1
230259
}
231260

232261
/** This method provides a way to get a cached plan if the key exists. */
233262
def getCachedTable(key: QualifiedTableName): LogicalPlan = {
234-
tableRelationCache.getIfPresent(key)
263+
Option(tableRelationCache.getIfPresent(key)).map(_._1).orNull
235264
}
236265

237266
/** This method provides a way to cache a plan. */
238-
def cacheTable(t: QualifiedTableName, l: LogicalPlan): Unit = {
239-
tableRelationCache.put(t, l)
267+
def cacheTable(
268+
t: QualifiedTableName,
269+
l: LogicalPlan,
270+
estimatePlanSize: Int = defaultPlanCacheSize): Unit = {
271+
tableRelationCache.put(t, (l, estimatePlanSize))
240272
}
241273

242274
/** This method provides a way to invalidate a cached plan. */

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5150,6 +5150,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
51505150
def tableRelationCacheSize: Int =
51515151
getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)
51525152

5153+
def tableRelationCacheMemorySizeInBytes: Int =
5154+
getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_MEM_SIZE_IN_BYTES)
5155+
51535156
def codegenCacheMaxEntries: Int = getConf(StaticSQLConf.CODEGEN_CACHE_MAX_ENTRIES)
51545157

51555158
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,18 @@ object StaticSQLConf {
9292
.checkValue(cacheSize => cacheSize >= 0, "The maximum size of the cache must not be negative")
9393
.createWithDefault(1000)
9494

95+
val FILESOURCE_TABLE_RELATION_CACHE_MEM_SIZE_IN_BYTES =
96+
buildStaticConf("spark.sql.filesourceTableRelationCache.memory.size")
97+
.internal()
98+
.doc("The maximum memory size of the cache that maps qualified table names to" +
99+
" table relation plans. Compare to the conf: spark.sql.filesourceTableRelationCacheSize," +
100+
" this conf is for the memory size limit, not for cache entry number, to avoid the" +
101+
" cached relation consume too much driver memory.")
102+
.version("3.2.0")
103+
.intConf
104+
.checkValue(cacheSize => cacheSize >= 0, "The maximum size of the cache must not be negative")
105+
.createWithDefault(200 * 1024 * 1024)
106+
95107
val CODEGEN_CACHE_MAX_ENTRIES = buildStaticConf("spark.sql.codegen.cache.maxEntries")
96108
.internal()
97109
.doc("When nonzero, enable caching of generated classes for operators and expressions. " +

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.{AliasIdentifier, FunctionIdentifier, Quali
2727
import org.apache.spark.sql.catalyst.analysis._
2828
import org.apache.spark.sql.catalyst.expressions._
2929
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
30-
import org.apache.spark.sql.catalyst.plans.logical.{LeafCommand, LogicalPlan, Project, Range, SubqueryAlias, View}
30+
import org.apache.spark.sql.catalyst.plans.logical.{Command, LeafCommand, LogicalPlan, Project, Range, SubqueryAlias, View}
3131
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
3232
import org.apache.spark.sql.connector.catalog.CatalogManager
3333
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
@@ -1922,4 +1922,40 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
19221922
"use temporary table")
19231923
}
19241924
}
1925+
1926+
test("Remove table relation cache if size exceed") {
1927+
case class TestCommand() extends Command {
1928+
override def children: Seq[LogicalPlan] = Nil
1929+
1930+
override protected def withNewChildrenInternal(
1931+
newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = { this }
1932+
}
1933+
1934+
val conf = new SQLConf()
1935+
conf.setConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_MEM_SIZE_IN_BYTES, 1000000)
1936+
1937+
withConfAndEmptyCatalog(conf) { catalog =>
1938+
val table1 = QualifiedTableName(catalog.getCurrentDatabase, "test1")
1939+
val table2 = QualifiedTableName(catalog.getCurrentDatabase, "test2")
1940+
val table3 = QualifiedTableName(catalog.getCurrentDatabase, "test3")
1941+
1942+
// First, make sure the test table is not cached.
1943+
assert(catalog.getCachedTable(table1) === null)
1944+
assert(catalog.getCachedTable(table2) === null)
1945+
assert(catalog.getCachedTable(table3) === null)
1946+
1947+
catalog.cacheTable(table1, TestCommand(), 100000)
1948+
assert(catalog.getCachedTable(table1) !== null)
1949+
1950+
var plan = catalog.getCachedPlan(table2, () => {
1951+
(TestCommand(), 100000)
1952+
})
1953+
assert(catalog.getCachedTable(table2) !== null)
1954+
1955+
catalog.getCachedPlan(table3, () => {
1956+
(TestCommand(), 1000000)
1957+
})
1958+
assert(catalog.getCachedTable(table3) === null)
1959+
}
1960+
}
19251961
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,15 @@ import org.apache.spark.sql.connector.expressions.aggregate.{AggregateFunc, Aggr
4848
import org.apache.spark.sql.errors.QueryCompilationErrors
4949
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
5050
import org.apache.spark.sql.execution.command._
51+
import org.apache.spark.sql.execution.datasources.DataSourceAnalysis.estimateRelationPlanSize
5152
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
5253
import org.apache.spark.sql.execution.streaming.StreamingRelation
5354
import org.apache.spark.sql.sources._
5455
import org.apache.spark.sql.types._
5556
import org.apache.spark.sql.util.{PartitioningUtils => CatalystPartitioningUtils}
5657
import org.apache.spark.sql.util.CaseInsensitiveStringMap
5758
import org.apache.spark.unsafe.types.UTF8String
59+
import org.apache.spark.util.SizeEstimator
5860

5961
/**
6062
* Replaces generic operations with specific variants that are designed to work with Spark
@@ -230,6 +232,19 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
230232
}
231233
insertCommand
232234
}
235+
236+
// Only consider the file status size in the hadoopFsRelation
237+
def estimateRelationPlanSize(logicalRelation: LogicalRelation, defaultPlanSize: Int): Int = {
238+
var size = logicalRelation match {
239+
case LogicalRelation(HadoopFsRelation(l: InMemoryFileIndex, _, _, _, _, _), _, _, _) =>
240+
SizeEstimator.estimate(l.leafFiles)
241+
case _ => defaultPlanSize
242+
}
243+
if (size > Int.MaxValue) {
244+
size = Int.MaxValue
245+
}
246+
size.toInt
247+
}
233248
}
234249

235250

@@ -257,7 +272,9 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
257272
className = table.provider.get,
258273
options = dsOptions,
259274
catalogTable = Some(table))
260-
LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
275+
val rel = LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), table)
276+
val estimateSize = estimateRelationPlanSize(rel, catalog.defaultPlanCacheSize)
277+
(rel, estimateSize)
261278
})
262279
}
263280

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._
3232
import org.apache.spark.sql.catalyst.plans.logical._
3333
import org.apache.spark.sql.catalyst.types.DataTypeUtils
3434
import org.apache.spark.sql.execution.datasources._
35+
import org.apache.spark.sql.execution.datasources.DataSourceAnalysis.estimateRelationPlanSize
3536
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
3637
import org.apache.spark.sql.internal.SQLConf
3738
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
@@ -260,7 +261,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
260261
fileFormat = fileFormat,
261262
options = enableDynamicPartition)(sparkSession = sparkSession)
262263
val created = LogicalRelation(fsRelation, updatedTable)
263-
catalogProxy.cacheTable(tableIdentifier, created)
264+
val estimateSize = estimateRelationPlanSize(created, catalogProxy.defaultPlanCacheSize)
265+
catalogProxy.cacheTable(tableIdentifier, created, estimateSize)
264266
created
265267
}
266268

@@ -289,8 +291,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
289291
options = hiveOptions.filter { case (k, _) => !k.equalsIgnoreCase("path") },
290292
className = fileType).resolveRelation(),
291293
table = updatedTable)
292-
293-
catalogProxy.cacheTable(tableIdentifier, created)
294+
val estimateSize = estimateRelationPlanSize(created, catalogProxy.defaultPlanCacheSize)
295+
catalogProxy.cacheTable(tableIdentifier, created, estimateSize)
294296
created
295297
}
296298

0 commit comments

Comments
 (0)