@@ -118,15 +118,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
118118 override def getAppUI (appId : String ): Option [SparkUI ] = {
119119 try {
120120 applications.get(appId).map { info =>
121- val ( replayBus, appListener) = createReplayBus ()
121+ val replayBus = new ReplayListenerBus ()
122122 val ui = {
123123 val conf = this .conf.clone()
124124 val appSecManager = new SecurityManager (conf)
125125 new SparkUI (conf, appSecManager, replayBus, appId,
126126 s " ${HistoryServer .UI_PATH_PREFIX }/ $appId" )
127127 // Do not call ui.bind() to avoid creating a new server for each application
128128 }
129- replayEvents (fs.getFileStatus(new Path (logDir, info.logPath)), replayBus)
129+ val appListener = replay (fs.getFileStatus(new Path (logDir, info.logPath)), replayBus)
130130
131131 ui.setAppName(s " ${appListener.appName.getOrElse(NOT_STARTED )} ( $appId) " )
132132
@@ -184,8 +184,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
184184 }
185185 .flatMap { entry =>
186186 try {
187- val (replayBus, appListener) = createReplayBus()
188- replayEvents(entry, replayBus)
187+ val appListener = replay(entry, new ReplayListenerBus ())
189188 Some (new FsApplicationHistoryInfo (
190189 entry.getPath().getName(),
191190 appListener.appId.getOrElse(entry.getPath().getName()),
@@ -232,22 +231,18 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
232231 }
233232 }
234233
235- private def createReplayBus (): (ReplayListenerBus , ApplicationEventListener ) = {
236- val replayBus = new ReplayListenerBus ()
237- val appListener = new ApplicationEventListener
238- replayBus.addListener(appListener)
239- (replayBus, appListener)
240- }
241-
242- private def replayEvents (logPath : FileStatus , bus : ReplayListenerBus ) = {
234+ private def replay (logPath : FileStatus , bus : ReplayListenerBus ): ApplicationEventListener = {
243235 val (logInput, sparkVersion) =
244236 if (logPath.isDir()) {
245237 openOldLog(logPath.getPath())
246238 } else {
247239 EventLoggingListener .openEventLog(logPath.getPath(), fs)
248240 }
249241 try {
242+ val appListener = new ApplicationEventListener
243+ bus.addListener(appListener)
250244 bus.replay(logInput, sparkVersion)
245+ appListener
251246 } finally {
252247 logInput.close()
253248 }
0 commit comments