@@ -32,16 +32,20 @@ import org.apache.spark.deploy.SparkHadoopUtil
3232import org .apache .spark .io .CompressionCodec
3333import org .apache .spark .scheduler ._
3434import org .apache .spark .ui .SparkUI
35- import org .apache .spark .util .{ThreadUtils , Utils }
35+ import org .apache .spark .util .{Clock , SystemClock , ThreadUtils , Utils }
3636import org .apache .spark .{Logging , SecurityManager , SparkConf }
3737
3838/**
3939 * A class that provides application history from event logs stored in the file system.
4040 * This provider checks for new finished applications in the background periodically and
4141 * renders the history application UI by parsing the associated event logs.
4242 */
43- private [history] class FsHistoryProvider (conf : SparkConf ) extends ApplicationHistoryProvider
44- with Logging {
43+ private [history] class FsHistoryProvider (conf : SparkConf , clock : Clock )
44+ extends ApplicationHistoryProvider with Logging {
45+
46+ def this (conf : SparkConf ) = {
47+ this (conf, new SystemClock ())
48+ }
4549
4650 import FsHistoryProvider ._
4751
@@ -75,8 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
7579 @ volatile private var applications : mutable.LinkedHashMap [String , FsApplicationHistoryInfo ]
7680 = new mutable.LinkedHashMap ()
7781
78- // List of applications to be deleted by event log cleaner.
79- private var appsToClean = new mutable.ListBuffer [FsApplicationHistoryInfo ]
82+ // List of application logs to be deleted by event log cleaner.
83+ private var attemptsToClean = new mutable.ListBuffer [FsApplicationAttemptInfo ]
8084
8185 // Constants used to parse Spark 1.0.0 log directories.
8286 private [history] val LOG_PREFIX = " EVENT_LOG_"
@@ -138,31 +142,33 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
138142
139143 override def getListing (): Iterable [FsApplicationHistoryInfo ] = applications.values
140144
141- override def getAppUI (appId : String ): Option [SparkUI ] = {
145+ override def getAppUI (appId : String , attemptId : Option [ String ] ): Option [SparkUI ] = {
142146 try {
143- applications.get(appId).map { info =>
144- val replayBus = new ReplayListenerBus ()
145- val ui = {
146- val conf = this .conf.clone()
147- val appSecManager = new SecurityManager (conf)
148- SparkUI .createHistoryUI(conf, replayBus, appSecManager, appId,
149- s " ${HistoryServer .UI_PATH_PREFIX }/ $appId" )
150- // Do not call ui.bind() to avoid creating a new server for each application
151- }
147+ applications.get(appId).flatMap { appInfo =>
148+ appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
149+ val replayBus = new ReplayListenerBus ()
150+ val ui = {
151+ val conf = this .conf.clone()
152+ val appSecManager = new SecurityManager (conf)
153+ SparkUI .createHistoryUI(conf, replayBus, appSecManager, appId,
154+ HistoryServer .getAttemptURI(appId, attempt.attemptId))
155+ // Do not call ui.bind() to avoid creating a new server for each application
156+ }
152157
153- val appListener = new ApplicationEventListener ()
154- replayBus.addListener(appListener)
155- val appInfo = replay(fs.getFileStatus(new Path (logDir, info .logPath)), replayBus)
158+ val appListener = new ApplicationEventListener ()
159+ replayBus.addListener(appListener)
160+ val appInfo = replay(fs.getFileStatus(new Path (logDir, attempt .logPath)), replayBus)
156161
157- ui.setAppName(s " ${appInfo.name} ( $appId) " )
162+ ui.setAppName(s " ${appInfo.name} ( $appId) " )
158163
159- val uiAclsEnabled = conf.getBoolean(" spark.history.ui.acls.enable" , false )
160- ui.getSecurityManager.setAcls(uiAclsEnabled)
161- // make sure to set admin acls before view acls so they are properly picked up
162- ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(" " ))
163- ui.getSecurityManager.setViewAcls(appInfo.sparkUser,
164- appListener.viewAcls.getOrElse(" " ))
165- ui
164+ val uiAclsEnabled = conf.getBoolean(" spark.history.ui.acls.enable" , false )
165+ ui.getSecurityManager.setAcls(uiAclsEnabled)
166+ // make sure to set admin acls before view acls so they are properly picked up
167+ ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(" " ))
168+ ui.getSecurityManager.setViewAcls(attempt.sparkUser,
169+ appListener.viewAcls.getOrElse(" " ))
170+ ui
171+ }
166172 }
167173 } catch {
168174 case e : FileNotFoundException => None
@@ -220,7 +226,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
220226 */
221227 private def mergeApplicationListing (logs : Seq [FileStatus ]): Unit = {
222228 val bus = new ReplayListenerBus ()
223- val newApps = logs.flatMap { fileStatus =>
229+ val newAttempts = logs.flatMap { fileStatus =>
224230 try {
225231 val res = replay(fileStatus, bus)
226232 logInfo(s " Application log ${res.logPath} loaded successfully. " )
@@ -232,76 +238,104 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
232238 e)
233239 None
234240 }
235- }.toSeq.sortWith(compareAppInfo)
236-
237- // When there are new logs, merge the new list with the existing one, maintaining
238- // the expected ordering (descending end time). Maintaining the order is important
239- // to avoid having to sort the list every time there is a request for the log list.
240- if (newApps.nonEmpty) {
241- val mergedApps = new mutable.LinkedHashMap [String , FsApplicationHistoryInfo ]()
242- def addIfAbsent (info : FsApplicationHistoryInfo ): Unit = {
243- if (! mergedApps.contains(info.id) ||
244- mergedApps(info.id).logPath.endsWith(EventLoggingListener .IN_PROGRESS ) &&
245- ! info.logPath.endsWith(EventLoggingListener .IN_PROGRESS )) {
246- mergedApps += (info.id -> info)
247- }
248- }
241+ }
249242
250- val newIterator = newApps.iterator.buffered
251- val oldIterator = applications.values.iterator.buffered
252- while (newIterator.hasNext && oldIterator.hasNext) {
253- if (compareAppInfo(newIterator.head, oldIterator.head)) {
254- addIfAbsent(newIterator.next())
255- } else {
256- addIfAbsent(oldIterator.next())
243+ if (newAttempts.isEmpty) {
244+ return
245+ }
246+
247+ // Build a map containing all apps that contain new attempts. The app information in this map
248+ // contains both the new app attempt, and those that were already loaded in the existing apps
249+ // map. If an attempt has been updated, it replaces the old attempt in the list.
250+ val newAppMap = new mutable.HashMap [String , FsApplicationHistoryInfo ]()
251+ newAttempts.foreach { attempt =>
252+ val appInfo = newAppMap.get(attempt.appId)
253+ .orElse(applications.get(attempt.appId))
254+ .map { app =>
255+ val attempts =
256+ app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List (attempt)
257+ new FsApplicationHistoryInfo (attempt.appId, attempt.name,
258+ attempts.sortWith(compareAttemptInfo))
257259 }
260+ .getOrElse(new FsApplicationHistoryInfo (attempt.appId, attempt.name, List (attempt)))
261+ newAppMap(attempt.appId) = appInfo
262+ }
263+
264+ // Merge the new app list with the existing one, maintaining the expected ordering (descending
265+ // end time). Maintaining the order is important to avoid having to sort the list every time
266+ // there is a request for the log list.
267+ val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
268+ val mergedApps = new mutable.LinkedHashMap [String , FsApplicationHistoryInfo ]()
269+ def addIfAbsent (info : FsApplicationHistoryInfo ): Unit = {
270+ if (! mergedApps.contains(info.id)) {
271+ mergedApps += (info.id -> info)
258272 }
259- newIterator.foreach(addIfAbsent)
260- oldIterator.foreach(addIfAbsent)
273+ }
261274
262- applications = mergedApps
275+ val newIterator = newApps.iterator.buffered
276+ val oldIterator = applications.values.iterator.buffered
277+ while (newIterator.hasNext && oldIterator.hasNext) {
278+ if (newAppMap.contains(oldIterator.head.id)) {
279+ oldIterator.next()
280+ } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
281+ addIfAbsent(newIterator.next())
282+ } else {
283+ addIfAbsent(oldIterator.next())
284+ }
263285 }
286+ newIterator.foreach(addIfAbsent)
287+ oldIterator.foreach(addIfAbsent)
288+
289+ applications = mergedApps
264290 }
265291
266292 /**
267293 * Delete event logs from the log directory according to the clean policy defined by the user.
268294 */
269- private def cleanLogs (): Unit = {
295+ private [history] def cleanLogs (): Unit = {
270296 try {
271297 val maxAge = conf.getTimeAsSeconds(" spark.history.fs.cleaner.maxAge" , " 7d" ) * 1000
272298
273- val now = System .currentTimeMillis ()
299+ val now = clock.getTimeMillis ()
274300 val appsToRetain = new mutable.LinkedHashMap [String , FsApplicationHistoryInfo ]()
275301
302+ def shouldClean (attempt : FsApplicationAttemptInfo ): Boolean = {
303+ now - attempt.lastUpdated > maxAge && attempt.completed
304+ }
305+
276306 // Scan all logs from the log directory.
277307 // Only completed applications older than the specified max age will be deleted.
278- applications.values.foreach { info =>
279- if (now - info.lastUpdated <= maxAge || ! info.completed) {
280- appsToRetain += (info.id -> info)
281- } else {
282- appsToClean += info
308+ applications.values.foreach { app =>
309+ val (toClean, toRetain) = app.attempts.partition(shouldClean)
310+ attemptsToClean ++= toClean
311+
312+ if (toClean.isEmpty) {
313+ appsToRetain += (app.id -> app)
314+ } else if (toRetain.nonEmpty) {
315+ appsToRetain += (app.id ->
316+ new FsApplicationHistoryInfo (app.id, app.name, toRetain.toList))
283317 }
284318 }
285319
286320 applications = appsToRetain
287321
288- val leftToClean = new mutable.ListBuffer [FsApplicationHistoryInfo ]
289- appsToClean .foreach { info =>
322+ val leftToClean = new mutable.ListBuffer [FsApplicationAttemptInfo ]
323+ attemptsToClean .foreach { attempt =>
290324 try {
291- val path = new Path (logDir, info .logPath)
325+ val path = new Path (logDir, attempt .logPath)
292326 if (fs.exists(path)) {
293327 fs.delete(path, true )
294328 }
295329 } catch {
296330 case e : AccessControlException =>
297- logInfo(s " No permission to delete ${info .logPath}, ignoring. " )
331+ logInfo(s " No permission to delete ${attempt .logPath}, ignoring. " )
298332 case t : IOException =>
299- logError(s " IOException in cleaning logs of ${info .logPath}" , t)
300- leftToClean += info
333+ logError(s " IOException in cleaning ${attempt .logPath}" , t)
334+ leftToClean += attempt
301335 }
302336 }
303337
304- appsToClean = leftToClean
338+ attemptsToClean = leftToClean
305339 } catch {
306340 case t : Exception => logError(" Exception in cleaning logs" , t)
307341 }
@@ -315,14 +349,36 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
315349 private def compareAppInfo (
316350 i1 : FsApplicationHistoryInfo ,
317351 i2 : FsApplicationHistoryInfo ): Boolean = {
318- if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
352+ val a1 = i1.attempts.head
353+ val a2 = i2.attempts.head
354+ if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
355+ }
356+
357+ /**
358+ * Comparison function that defines the sort order for application attempts within the same
359+ * application. Order is: running attempts before complete attempts, running attempts sorted
360+ * by start time, completed attempts sorted by end time.
361+ *
362+ * Normally applications should have a single running attempt; but failure to call sc.stop()
363+ * may cause multiple running attempts to show up.
364+ *
365+ * @return Whether `a1` should precede `a2`.
366+ */
367+ private def compareAttemptInfo (
368+ a1 : FsApplicationAttemptInfo ,
369+ a2 : FsApplicationAttemptInfo ): Boolean = {
370+ if (a1.completed == a2.completed) {
371+ if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
372+ } else {
373+ ! a1.completed
374+ }
319375 }
320376
321377 /**
322378 * Replays the events in the specified log file and returns information about the associated
323379 * application.
324380 */
325- private def replay (eventLog : FileStatus , bus : ReplayListenerBus ): FsApplicationHistoryInfo = {
381+ private def replay (eventLog : FileStatus , bus : ReplayListenerBus ): FsApplicationAttemptInfo = {
326382 val logPath = eventLog.getPath()
327383 logInfo(s " Replaying log path: $logPath" )
328384 val logInput =
@@ -336,10 +392,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
336392 val appCompleted = isApplicationCompleted(eventLog)
337393 bus.addListener(appListener)
338394 bus.replay(logInput, logPath.toString, ! appCompleted)
339- new FsApplicationHistoryInfo (
395+ new FsApplicationAttemptInfo (
340396 logPath.getName(),
341- appListener.appId.getOrElse(logPath.getName()),
342397 appListener.appName.getOrElse(NOT_STARTED ),
398+ appListener.appId.getOrElse(logPath.getName()),
399+ appListener.appAttemptId,
343400 appListener.startTime.getOrElse(- 1L ),
344401 appListener.endTime.getOrElse(- 1L ),
345402 getModificationTime(eventLog).get,
@@ -425,13 +482,21 @@ private object FsHistoryProvider {
425482 val DEFAULT_LOG_DIR = " file:/tmp/spark-events"
426483}
427484
428- private class FsApplicationHistoryInfo (
485+ private class FsApplicationAttemptInfo (
429486 val logPath : String ,
430- id : String ,
431- name : String ,
487+ val name : String ,
488+ val appId : String ,
489+ attemptId : Option [String ],
432490 startTime : Long ,
433491 endTime : Long ,
434492 lastUpdated : Long ,
435493 sparkUser : String ,
436494 completed : Boolean = true )
437- extends ApplicationHistoryInfo (id, name, startTime, endTime, lastUpdated, sparkUser, completed)
495+ extends ApplicationAttemptInfo (
496+ attemptId, startTime, endTime, lastUpdated, sparkUser, completed)
497+
498+ private class FsApplicationHistoryInfo (
499+ id : String ,
500+ override val name : String ,
501+ override val attempts : List [FsApplicationAttemptInfo ])
502+ extends ApplicationHistoryInfo (id, name, attempts)
0 commit comments