Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, StandaloneSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.status.AppStatusStore
import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
Expand Down Expand Up @@ -213,6 +214,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _jars: Seq[String] = _
private var _files: Seq[String] = _
private var _shutdownHookRef: AnyRef = _
private var _statusStore: AppStatusStore = _

/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
Expand Down Expand Up @@ -421,6 +423,10 @@ class SparkContext(config: SparkConf) extends Logging {
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addToStatusQueue(jobProgressListener)

// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
_statusStore = AppStatusStore.createLiveStore(conf, listenerBus)

// Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
Expand All @@ -442,8 +448,12 @@ class SparkContext(config: SparkConf) extends Logging {

_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
Some(SparkUI.create(Some(this), _statusStore, _conf,
l => listenerBus.addToStatusQueue(l),
_env.securityManager,
appName,
"",
startTime))
} else {
// For tests, do not enable the UI
None
Expand Down Expand Up @@ -1939,6 +1949,9 @@ class SparkContext(config: SparkConf) extends Logging {
}
SparkEnv.set(null)
}
if (_statusStore != null) {
_statusStore.close()
}
// Clear this `InheritableThreadLocal`, or it will still be inherited in child threads even this
// `SparkContext` is stopped.
localProperties.remove()
Expand Down
Loading