@@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging {
187187 val master = conf.get(" spark.master" )
188188 val appName = conf.get(" spark.app.name" )
189189
190+ private [spark] val isEventLogEnabled = conf.getBoolean(" spark.eventLog.enabled" , false )
191+ private [spark] val eventLogDir : Option [String ] = {
192+ if (isEventLogEnabled) {
193+ Some (conf.get(" spark.eventLog.dir" , EventLoggingListener .DEFAULT_LOG_DIR ).stripSuffix(" /" ))
194+ } else {
195+ None
196+ }
197+ }
198+
190199 // Generate the random name for a temp folder in Tachyon
191200 // Add a timestamp as the suffix here to make it more safe
192201 val tachyonFolderName = " spark-" + randomUUID.toString()
@@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging {
200209 private [spark] val listenerBus = new LiveListenerBus
201210
202211 // Create the Spark execution environment (cache, map output tracker, etc)
212+ conf.set(" spark.executor.id" , " driver" )
203213 private [spark] val env = SparkEnv .create(
204214 conf,
205215 " <driver>" ,
@@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging {
232242 /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
233243 val hadoopConfiguration = SparkHadoopUtil .get.newConfiguration(conf)
234244
235- // Optionally log Spark events
236- private [spark] val eventLogger : Option [EventLoggingListener ] = {
237- if (conf.getBoolean(" spark.eventLog.enabled" , false )) {
238- val logger = new EventLoggingListener (appName, conf, hadoopConfiguration)
239- logger.start()
240- listenerBus.addListener(logger)
241- Some (logger)
242- } else None
243- }
244-
245- // At this point, all relevant SparkListeners have been registered, so begin releasing events
246- listenerBus.start()
247-
248245 val startTime = System .currentTimeMillis()
249246
250247 // Add each JAR given through the constructor
@@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging {
309306 // constructor
310307 taskScheduler.start()
311308
309+ val applicationId : String = taskScheduler.applicationId()
310+ conf.set(" spark.app.id" , applicationId)
311+
312+ val metricsSystem = env.metricsSystem
313+
314+ // The metrics system for Driver need to be set spark.app.id to app ID.
315+ // So it should start after we get app ID from the task scheduler and set spark.app.id.
316+ metricsSystem.start()
317+
318+ // Optionally log Spark events
319+ private [spark] val eventLogger : Option [EventLoggingListener ] = {
320+ if (isEventLogEnabled) {
321+ val logger =
322+ new EventLoggingListener (applicationId, eventLogDir.get, conf, hadoopConfiguration)
323+ logger.start()
324+ listenerBus.addListener(logger)
325+ Some (logger)
326+ } else None
327+ }
328+
329+ // At this point, all relevant SparkListeners have been registered, so begin releasing events
330+ listenerBus.start()
331+
312332 private [spark] val cleaner : Option [ContextCleaner ] = {
313333 if (conf.getBoolean(" spark.cleaner.referenceTracking" , true )) {
314334 Some (new ContextCleaner (this ))
@@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging {
411431 // Post init
412432 taskScheduler.postStartHook()
413433
414- private val dagSchedulerSource = new DAGSchedulerSource (this .dagScheduler, this )
415- private val blockManagerSource = new BlockManagerSource (SparkEnv .get.blockManager, this )
434+ private val dagSchedulerSource = new DAGSchedulerSource (this .dagScheduler)
435+ private val blockManagerSource = new BlockManagerSource (SparkEnv .get.blockManager)
416436
417437 private def initDriverMetrics () {
418438 SparkEnv .get.metricsSystem.registerSource(dagSchedulerSource)
@@ -759,20 +779,20 @@ class SparkContext(config: SparkConf) extends Logging {
759779 /**
760780 * Create an [[org.apache.spark.Accumulable ]] shared variable, to which tasks can add values
761781 * with `+=`. Only the driver can access the accumuable's `value`.
762- * @tparam T accumulator type
763- * @tparam R type that can be added to the accumulator
782+ * @tparam R accumulator result type
783+ * @tparam T type that can be added to the accumulator
764784 */
765- def accumulable [T , R ](initialValue : T )(implicit param : AccumulableParam [T , R ]) =
785+ def accumulable [R , T ](initialValue : R )(implicit param : AccumulableParam [R , T ]) =
766786 new Accumulable (initialValue, param)
767787
768788 /**
769789 * Create an [[org.apache.spark.Accumulable ]] shared variable, with a name for display in the
770790 * Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
771791 * access the accumuable's `value`.
772- * @tparam T accumulator type
773- * @tparam R type that can be added to the accumulator
792+ * @tparam R accumulator result type
793+ * @tparam T type that can be added to the accumulator
774794 */
775- def accumulable [T , R ](initialValue : T , name : String )(implicit param : AccumulableParam [T , R ]) =
795+ def accumulable [R , T ](initialValue : R , name : String )(implicit param : AccumulableParam [R , T ]) =
776796 new Accumulable (initialValue, param, Some (name))
777797
778798 /**
@@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging {
12781298 private def postApplicationStart () {
12791299 // Note: this code assumes that the task scheduler has been initialized and has contacted
12801300 // the cluster manager to get an application ID (in case the cluster manager provides one).
1281- listenerBus.post(SparkListenerApplicationStart (appName, taskScheduler.applicationId( ),
1301+ listenerBus.post(SparkListenerApplicationStart (appName, Some (applicationId ),
12821302 startTime, sparkUser))
12831303 }
12841304
0 commit comments