Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import java.util.UUID.randomUUID
import scala.collection.JavaConverters._
import scala.collection.{Map, Set}
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.collection.mutable.{ListBuffer, HashMap}
import scala.reflect.{ClassTag, classTag}
import scala.util.control.NonFatal

Expand Down Expand Up @@ -241,6 +241,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private val _stopHooks = new ListBuffer[() => Unit]()

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -1618,6 +1619,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
listenerBus.post(SparkListenerUnpersistRDD(rddId))
}

/**
* Adds a stop hook which can be used to clean up additional resource. This is called when the
* sparkContext is being stopped.
*/
private[spark] def addStopHook(hook: () => Unit): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method (or the collection that it manipulates) need to be synchronized in order to be thread-safe?

_stopHooks += hook
}

/**
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
Expand Down Expand Up @@ -1764,10 +1773,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Unset YARN mode system env variable, to allow switching between cluster types.
System.clearProperty("SPARK_YARN_MODE")
SparkContext.clearActiveContext()
_stopHooks.foreach(hook => Utils.tryLogNonFatalError {
hook()
})
logInfo("Successfully stopped SparkContext")
}


/**
* Get Spark's home location from either a value set through the constructor,
* or the spark.home Java property, or the SPARK_HOME environment variable
Expand Down
6 changes: 6 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ class SQLContext private[sql](
}
def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)

sparkContext.addStopHook(() => {
SQLContext.clearInstantiatedContext()
SQLContext.clearActive()
SQLContext.clearSqlListener()
})

// If spark.sql.allowMultipleContexts is true, we will throw an exception if a user
// wants to create a new root SQLContext (a SLQContext that is not created by newSession).
private val allowMultipleContexts =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite {
.set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
val sc = new SparkContext(conf)
try {
// Clear the sql listener created by a previous test suite.
SQLContext.clearSqlListener()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a public API. So the user cannot clear SQLContext.sqlListener? This will be a memory leak considering SQLListener usually stores a lot of data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously each SQLContext has its own sqlListener. Because now the SQL events are posted to the event bus. All SQLContext now share a single sqlListener. I don't think a user need clear SQLContext.sqlListener. This is only used by the unit tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-11700 is a bit different. But my point is we should not keep a big object in memory and don't provide an approach to clean it. In some user cases, Spark SQL may be just one of some ETL steps. And if the user finishes his/her work in Spark SQL, he/she usually wants to clean up all resources used by SparkContext/SQLContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Is it enough to make SQLContext.clearSqlListener public here? So we provide a way to clear the reference for users who want the object to be GCed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.. I can imagine Zeppelin wanting to purge these, or whatever Spark Kernel is named as.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add a SparkContext stop hook. When SparkContext is being stopped, clear the reference. The user doesn't have to call a method to clear the sqlListener reference. The sqlListener is added to SparkContext and will only be garbage collected when SparkContext is stopped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Over on the original PR, I commented to ask why SQLContext.sqlListener needs to be an AtomicReference[SQLListener] instead of an AtomicBoolean or some other sort of atomic primitive. As far as I can tell, we never access any methods or fields of the sqlListener that's stored here, so if we only need to set something for compare-and-swap purposes then I think we shouldn't use an AtomicReference, thereby avoiding the GC issues that it causes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many unit tests use sqlContext.listener. Can you please suggest how to update the unit tests if we changed to use an AtomicBoolean?

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// Run 100 successful executions and 100 failed executions.
Expand Down