Skip to content

Commit 1df3358

Browse files
wakunGitHub Enterprise
authored andcommitted
[CARMEL-7550][CARMEL-4895] ExternalCatalogListener make SparkSession leak (apache#222)
1 parent 3662848 commit 1df3358

File tree

2 files changed

+5
-0
lines changed

2 files changed

+5
-0
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,10 @@ class SessionCatalog(
253253
private val listener = new ExternalCatalogListener(this)
254254
liveBus.map(_.addToExternalCatalogQueue(listener))
255255

256+
def removeCatalogListener(): Unit = {
257+
liveBus.foreach(_.removeListener(listener))
258+
}
259+
256260
/** This method provides a way to get a cached plan. */
257261
def getCachedPlan(t: QualifiedTableName, c: Callable[(LogicalPlan, Int)]): LogicalPlan = {
258262
tableRelationCache.get(t, c)._1

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext:
122122
ctx.sparkSession.scratchSessionPath.foreach { p =>
123123
SparkHadoopUtil.deletePath(p, ctx.sparkSession.sparkContext.hadoopConfiguration)
124124
}
125+
ctx.sparkSession.sessionState.catalog.removeCatalogListener()
125126
super.closeSession(sessionHandle)
126127
sparkSqlOperationManager.sessionToContexts.remove(sessionHandle)
127128
AnalyticsTaskSchedulerImpl.activeSessions.decrementAndGet()

0 commit comments

Comments
 (0)