@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
1919
2020import java .io .{File , FileNotFoundException , IOException }
2121import java .util .{Date , UUID }
22- import java .util .concurrent .{Executors , ExecutorService , Future , TimeUnit }
22+ import java .util .concurrent .{ExecutorService , TimeUnit }
2323import java .util .zip .{ZipEntry , ZipOutputStream }
2424
2525import scala .collection .JavaConverters ._
@@ -28,7 +28,7 @@ import scala.xml.Node
2828
2929import com .fasterxml .jackson .annotation .JsonIgnore
3030import com .google .common .io .ByteStreams
31- import com .google .common .util .concurrent .{ MoreExecutors , ThreadFactoryBuilder }
31+ import com .google .common .util .concurrent .MoreExecutors
3232import org .apache .hadoop .fs .{FileStatus , Path }
3333import org .apache .hadoop .fs .permission .FsAction
3434import org .apache .hadoop .hdfs .DistributedFileSystem
@@ -114,8 +114,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
114114 // Used by check event thread and clean log thread.
115115 // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
116116 // and applications between check task and clean task.
117- private val pool = Executors .newScheduledThreadPool(1 , new ThreadFactoryBuilder ()
118- .setNameFormat(" spark-history-task-%d" ).setDaemon(true ).build())
117+ private val pool = ThreadUtils .newDaemonSingleThreadScheduledExecutor(" spark-history-task-%d" )
119118
120119 // The modification time of the newest log detected during the last scan. Currently only
121120 // used for logging msgs (logs are re-scanned based on file size, rather than modtime)
@@ -167,7 +166,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
167166 * Fixed size thread pool to fetch and parse log files.
168167 */
169168 private val replayExecutor : ExecutorService = {
170- if (! conf.contains( " spark.testing " ) ) {
169+ if (Utils .isTesting ) {
171170 ThreadUtils .newDaemonFixedThreadPool(NUM_PROCESSING_THREADS , " log-replay-executor" )
172171 } else {
173172 MoreExecutors .sameThreadExecutor()
@@ -262,6 +261,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
262261 .iterator()
263262 .asScala
264263 .map(_.toAppHistoryInfo())
264+ .toList
265+ .iterator
265266 }
266267
267268 override def getApplicationInfo (appId : String ): Option [ApplicationHistoryInfo ] = {
@@ -406,49 +407,71 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
406407 try {
407408 val newLastScanTime = getNewLastScanTime()
408409 logDebug(s " Scanning $logDir with lastScanTime== $lastScanTime" )
409- // scan for modified applications, replay and merge them
410- val logInfos = Option (fs.listStatus(new Path (logDir))).map(_.toSeq).getOrElse(Nil )
410+
411+ val updated = Option (fs.listStatus(new Path (logDir))).map(_.toSeq).getOrElse(Nil )
411412 .filter { entry =>
412413 ! entry.isDirectory() &&
413414 // FsHistoryProvider generates a hidden file which can't be read. Accidentally
414415 // reading a garbage file is safe, but we would log an error which can be scary to
415416 // the end-user.
416417 ! entry.getPath().getName().startsWith(" ." ) &&
417- SparkHadoopUtil .get.checkAccessPermission(entry, FsAction .READ ) &&
418- recordedFileSize(entry.getPath()) < entry.getLen()
418+ SparkHadoopUtil .get.checkAccessPermission(entry, FsAction .READ )
419+ }
420+ .filter { entry =>
421+ try {
422+ val info = listing.read(classOf [LogInfo ], entry.getPath().toString())
423+ if (info.fileSize < entry.getLen()) {
424+ // Log size has changed, it should be parsed.
425+ true
426+ } else {
427+ // If SHS view has a valid application, update the time the file was last seen so that
428+ // the entry is not deleted from the SHS listing.
429+ if (info.appId.isDefined) {
430+ listing.write(new LogInfo (info.logPath, newLastScanTime, info.appId, info.attemptId,
431+ info.fileSize))
432+ }
433+ false
434+ }
435+ } catch {
436+ case _ : NoSuchElementException =>
437+ // If the file is currently not being tracked by the SHS, add an entry for it and try
438+ // to parse it. This will allow the cleaner code to detect the file as stale later on
439+ // if it was not possible to parse it.
440+ listing.write(new LogInfo (entry.getPath().toString(), newLastScanTime, None , None ,
441+ entry.getLen()))
442+ entry.getLen() > 0
443+ }
419444 }
420445 .sortWith { case (entry1, entry2) =>
421446 entry1.getModificationTime() > entry2.getModificationTime()
422447 }
423448
424- if (logInfos .nonEmpty) {
425- logDebug(s " New/updated attempts found: ${logInfos .size} ${logInfos .map(_.getPath)}" )
449+ if (updated .nonEmpty) {
450+ logDebug(s " New/updated attempts found: ${updated .size} ${updated .map(_.getPath)}" )
426451 }
427452
428- var tasks = mutable.ListBuffer [Future [_]]()
429-
430- try {
431- for (file <- logInfos) {
432- tasks += replayExecutor.submit(new Runnable {
433- override def run (): Unit = mergeApplicationListing(file)
453+ val tasks = updated.map { entry =>
454+ try {
455+ replayExecutor.submit(new Runnable {
456+ override def run (): Unit = mergeApplicationListing(entry, newLastScanTime)
434457 })
458+ } catch {
459+ // let the iteration over logInfos break, since an exception on
460+ // replayExecutor.submit (..) indicates the ExecutorService is unable
461+ // to take any more submissions at this time
462+ case e : Exception =>
463+ logError(s " Exception while submitting event log for replay " , e)
464+ null
435465 }
436- } catch {
437- // let the iteration over logInfos break, since an exception on
438- // replayExecutor.submit (..) indicates the ExecutorService is unable
439- // to take any more submissions at this time
440-
441- case e : Exception =>
442- logError(s " Exception while submitting event log for replay " , e)
443- }
466+ }.filter(_ != null )
444467
445468 pendingReplayTasksCount.addAndGet(tasks.size)
446469
470+ // Wait for all tasks to finish. This makes sure that checkForLogs
471+ // is not scheduled again while some tasks are already running in
472+ // the replayExecutor.
447473 tasks.foreach { task =>
448474 try {
449- // Wait for all tasks to finish. This makes sure that checkForLogs
450- // is not scheduled again while some tasks are already running in
451- // the replayExecutor.
452475 task.get()
453476 } catch {
454477 case e : InterruptedException =>
@@ -460,13 +483,70 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
460483 }
461484 }
462485
486+ // Delete all information about applications whose log files disappeared from storage.
487+ // This is done by identifying the event logs which were not touched by the current
488+ // directory scan.
489+ //
490+ // Only entries with valid applications are cleaned up here. Cleaning up invalid log
491+ // files is done by the periodic cleaner task.
492+ val stale = listing.view(classOf [LogInfo ])
493+ .index(" lastProcessed" )
494+ .last(newLastScanTime - 1 )
495+ .asScala
496+ .toList
497+ stale.foreach { log =>
498+ log.appId.foreach { appId =>
499+ cleanAppData(appId, log.attemptId, log.logPath)
500+ listing.delete(classOf [LogInfo ], log.logPath)
501+ }
502+ }
503+
463504 lastScanTime.set(newLastScanTime)
464505 } catch {
465506 case e : Exception => logError(" Exception in checking for event log updates" , e)
466507 }
467508 }
468509
469- private def getNewLastScanTime (): Long = {
510+ private def cleanAppData (appId : String , attemptId : Option [String ], logPath : String ): Unit = {
511+ try {
512+ val app = load(appId)
513+ val (attempt, others) = app.attempts.partition(_.info.attemptId == attemptId)
514+
515+ assert(attempt.isEmpty || attempt.size == 1 )
516+ val isStale = attempt.headOption.exists { a =>
517+ if (a.logPath != new Path (logPath).getName()) {
518+ // If the log file name does not match, then probably the old log file was from an
519+ // in progress application. Just return that the app should be left alone.
520+ false
521+ } else {
522+ val maybeUI = synchronized {
523+ activeUIs.remove(appId -> attemptId)
524+ }
525+
526+ maybeUI.foreach { ui =>
527+ ui.invalidate()
528+ ui.ui.store.close()
529+ }
530+
531+ storeManager.foreach(_.release(appId, attemptId, delete = true ))
532+ true
533+ }
534+ }
535+
536+ if (isStale) {
537+ if (others.nonEmpty) {
538+ val newAppInfo = new ApplicationInfoWrapper (app.info, others)
539+ listing.write(newAppInfo)
540+ } else {
541+ listing.delete(classOf [ApplicationInfoWrapper ], appId)
542+ }
543+ }
544+ } catch {
545+ case _ : NoSuchElementException =>
546+ }
547+ }
548+
549+ private [history] def getNewLastScanTime (): Long = {
470550 val fileName = " ." + UUID .randomUUID().toString
471551 val path = new Path (logDir, fileName)
472552 val fos = fs.create(path)
@@ -531,7 +611,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
531611 /**
532612 * Replay the given log file, saving the application in the listing db.
533613 */
534- protected def mergeApplicationListing (fileStatus : FileStatus ): Unit = {
614+ protected def mergeApplicationListing (fileStatus : FileStatus , scanTime : Long ): Unit = {
535615 val eventsFilter : ReplayEventsFilter = { eventString =>
536616 eventString.startsWith(APPL_START_EVENT_PREFIX ) ||
537617 eventString.startsWith(APPL_END_EVENT_PREFIX ) ||
@@ -546,73 +626,75 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
546626
547627 replay(fileStatus, bus, eventsFilter = eventsFilter)
548628
549- listener.applicationInfo.foreach { app =>
550- // Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a
551- // discussion on the UI lifecycle.
552- synchronized {
553- activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui =>
554- ui.invalidate()
555- ui.ui.store.close()
629+ val (appId, attemptId) = listener.applicationInfo match {
630+ case Some (app) =>
631+ // Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a
632+ // discussion on the UI lifecycle.
633+ synchronized {
634+ activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui =>
635+ ui.invalidate()
636+ ui.ui.store.close()
637+ }
556638 }
557- }
558639
559- addListing(app)
640+ addListing(app)
641+ (Some (app.info.id), app.attempts.head.info.attemptId)
642+
643+ case _ =>
644+ (None , None )
560645 }
561- listing.write(new LogInfo (logPath.toString(), fileStatus.getLen()))
646+ listing.write(new LogInfo (logPath.toString(), scanTime, appId, attemptId, fileStatus.getLen()))
562647 }
563648
564649 /**
565650 * Delete event logs from the log directory according to the clean policy defined by the user.
566651 */
567- private [history] def cleanLogs (): Unit = {
568- var iterator : Option [KVStoreIterator [ApplicationInfoWrapper ]] = None
569- try {
570- val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S ) * 1000
571-
572- // Iterate descending over all applications whose oldest attempt happened before maxTime.
573- iterator = Some (listing.view(classOf [ApplicationInfoWrapper ])
574- .index(" oldestAttempt" )
575- .reverse()
576- .first(maxTime)
577- .closeableIterator())
578-
579- iterator.get.asScala.foreach { app =>
580- // Applications may have multiple attempts, some of which may not need to be deleted yet.
581- val (remaining, toDelete) = app.attempts.partition { attempt =>
582- attempt.info.lastUpdated.getTime() >= maxTime
583- }
652+ private [history] def cleanLogs (): Unit = Utils .tryLog {
653+ val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S ) * 1000
584654
585- if (remaining.nonEmpty) {
586- val newApp = new ApplicationInfoWrapper (app.info, remaining)
587- listing.write(newApp)
588- }
655+ val expired = listing.view(classOf [ApplicationInfoWrapper ])
656+ .index(" oldestAttempt" )
657+ .reverse()
658+ .first(maxTime)
659+ .asScala
660+ .toList
661+ expired.foreach { app =>
662+ // Applications may have multiple attempts, some of which may not need to be deleted yet.
663+ val (remaining, toDelete) = app.attempts.partition { attempt =>
664+ attempt.info.lastUpdated.getTime() >= maxTime
665+ }
589666
590- toDelete.foreach { attempt =>
591- val logPath = new Path (logDir, attempt.logPath)
592- try {
593- listing.delete(classOf [LogInfo ], logPath.toString())
594- } catch {
595- case _ : NoSuchElementException =>
596- logDebug(s " Log info entry for $logPath not found. " )
597- }
598- try {
599- fs.delete(logPath, true )
600- } catch {
601- case e : AccessControlException =>
602- logInfo(s " No permission to delete ${attempt.logPath}, ignoring. " )
603- case t : IOException =>
604- logError(s " IOException in cleaning ${attempt.logPath}" , t)
605- }
606- }
667+ if (remaining.nonEmpty) {
668+ val newApp = new ApplicationInfoWrapper (app.info, remaining)
669+ listing.write(newApp)
670+ }
607671
608- if (remaining.isEmpty) {
609- listing.delete(app.getClass(), app.id)
610- }
672+ toDelete.foreach { attempt =>
673+ logInfo(s " Deleting expired event log for ${attempt.logPath}" )
674+ val logPath = new Path (logDir, attempt.logPath)
675+ listing.delete(classOf [LogInfo ], logPath.toString())
676+ cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
677+ deleteLog(logPath)
678+ }
679+
680+ if (remaining.isEmpty) {
681+ listing.delete(app.getClass(), app.id)
682+ }
683+ }
684+
685+ // Delete log files that don't have a valid application and exceed the configured max age.
686+ val stale = listing.view(classOf [LogInfo ])
687+ .index(" lastProcessed" )
688+ .reverse()
689+ .first(maxTime)
690+ .asScala
691+ .toList
692+ stale.foreach { log =>
693+ if (! log.appId.isDefined) {
694+ logInfo(s " Deleting invalid / corrupt event log ${log.logPath}" )
695+ deleteLog(new Path (log.logPath))
696+ listing.delete(classOf [LogInfo ], log.logPath)
611697 }
612- } catch {
613- case t : Exception => logError(" Exception while cleaning logs" , t)
614- } finally {
615- iterator.foreach(_.close())
616698 }
617699 }
618700
@@ -705,18 +787,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
705787 | application count= $count} """ .stripMargin
706788 }
707789
708- /**
709- * Return the last known size of the given event log, recorded the last time the file
710- * system scanner detected a change in the file.
711- */
712- private def recordedFileSize (log : Path ): Long = {
713- try {
714- listing.read(classOf [LogInfo ], log.toString()).fileSize
715- } catch {
716- case _ : NoSuchElementException => 0L
717- }
718- }
719-
720790 private def load (appId : String ): ApplicationInfoWrapper = {
721791 listing.read(classOf [ApplicationInfoWrapper ], appId)
722792 }
@@ -798,6 +868,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
798868 throw new NoSuchElementException (s " Cannot find attempt $attemptId of $appId. " ))
799869 }
800870
871+ private def deleteLog (log : Path ): Unit = {
872+ try {
873+ fs.delete(log, true )
874+ } catch {
875+ case e : AccessControlException =>
876+ logInfo(s " No permission to delete $log, ignoring. " )
877+ case ioe : IOException =>
878+ logError(s " IOException in cleaning $log" , ioe)
879+ }
880+ }
881+
801882}
802883
803884private [history] object FsHistoryProvider {
@@ -826,6 +907,9 @@ private[history] case class FsHistoryProviderMetadata(
826907
827908private [history] case class LogInfo (
828909 @ KVIndexParam logPath : String ,
910+ @ KVIndexParam (" lastProcessed" ) lastProcessed : Long ,
911+ appId : Option [String ],
912+ attemptId : Option [String ],
829913 fileSize : Long )
830914
831915private [history] class AttemptInfoWrapper (
0 commit comments