diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index e894f39d9270b..2a3c26561fcb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -113,6 +113,9 @@ private[sql] class SharedState( val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore] val listener = new SQLAppStatusListener(conf, kvStore, live = true) sparkContext.listenerBus.addToStatusQueue(listener) + sparkContext.cleaner.foreach { cleaner => + cleaner.registerSparkListenerForCleanup(this, listener) + } val statusStore = new SQLAppStatusStore(kvStore, Some(listener)) sparkContext.ui.foreach(new SQLTab(statusStore, _)) statusStore diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala index 0a7c684a68955..715f9184ba133 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala @@ -28,6 +28,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, SparkContext, SparkException, SparkFunSuite} import org.apache.spark.internal.config.EXECUTOR_ALLOW_SPARK_CONTEXT import org.apache.spark.internal.config.UI.UI_ENABLED +import org.apache.spark.sql.execution.ui.SQLAppStatusListener import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.util.ExecutionListenerBus @@ -74,6 +75,41 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach wit } } + test("SPARK-32165: Fix memory leak of SQLAppStatusListener") { + val conf = new SparkConf() + .setMaster("local") + .setAppName("test-app-SPARK-32165") + val context = new SparkContext(conf) + + @inline def listenersNum(): Int = { + context + .listenerBus + .listeners + .asScala + .count(_.isInstanceOf[SQLAppStatusListener]) + } + + (1 to 10).foreach { _ => + SparkSession + .builder() + .sparkContext(context) + .master("local") + .getOrCreate() + .sessionState // this touches the sessionState + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + } + + eventually(timeout(10.seconds), interval(1.seconds)) { + System.gc() + // After GC, the number of SQLAppStatusListener should be less than 10 (we created 10 + // SparkSessions in total). + // Since GC can't 100% guarantee all out-of-referenced objects be cleaned at one time, + // here, we check at least one listener is cleaned up to prove the mechanism works. + assert(listenersNum() < 10) + } + } + test("create with config options and propagate them to SparkContext and SparkSession") { val session = SparkSession.builder() .master("local")