@@ -26,7 +26,6 @@ import java.util.zip.ZipOutputStream
2626
2727import scala .collection .JavaConverters ._
2828import scala .collection .mutable
29- import scala .concurrent .ExecutionException
3029import scala .io .Source
3130import scala .xml .Node
3231
@@ -160,6 +159,26 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
160159 new HistoryServerDiskManager (conf, path, listing, clock)
161160 }
162161
162+ // Used to store the paths, which are being processed. This enable the replay log tasks execute
163+ // asynchronously and make sure that checkForLogs would not process a path repeatedly.
164+ private val processing = ConcurrentHashMap .newKeySet[String ]
165+
166+ private def isProcessing (path : Path ): Boolean = {
167+ processing.contains(path.getName)
168+ }
169+
170+ private def isProcessing (info : LogInfo ): Boolean = {
171+ processing.contains(info.logPath.split(" /" ).last)
172+ }
173+
174+ private def processing (path : Path ): Unit = {
175+ processing.add(path.getName)
176+ }
177+
178+ private def endProcessing (path : Path ): Unit = {
179+ processing.remove(path.getName)
180+ }
181+
163182 private val blacklist = new ConcurrentHashMap [String , Long ]
164183
165184 // Visible for testing
@@ -439,6 +458,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
439458
440459 val updated = Option (fs.listStatus(new Path (logDir))).map(_.toSeq).getOrElse(Nil )
441460 .filter { entry => ! isBlacklisted(entry.getPath) }
461+ .filter { entry => ! isProcessing(entry.getPath) }
442462 .flatMap { entry => EventLogFileReader (fs, entry) }
443463 .filter { reader =>
444464 try {
@@ -511,11 +531,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
511531 logDebug(s " New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}" )
512532 }
513533
514- val tasks = updated.flatMap { entry =>
534+ updated.foreach { entry =>
535+ processing(entry.rootPath)
515536 try {
516- val task : Future [Unit ] = replayExecutor.submit(
517- () => mergeApplicationListing(entry, newLastScanTime, true ))
518- Some (task -> entry.rootPath)
537+ val task : Runnable = () => mergeApplicationListing(entry, newLastScanTime, true )
538+ replayExecutor.submit(task)
519539 } catch {
520540 // let the iteration over the updated entries break, since an exception on
521541 // replayExecutor.submit (..) indicates the ExecutorService is unable
@@ -526,31 +546,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
526546 }
527547 }
528548
529- pendingReplayTasksCount.addAndGet(tasks.size)
530-
531- // Wait for all tasks to finish. This makes sure that checkForLogs
532- // is not scheduled again while some tasks are already running in
533- // the replayExecutor.
534- tasks.foreach { case (task, path) =>
535- try {
536- task.get()
537- } catch {
538- case e : InterruptedException =>
539- throw e
540- case e : ExecutionException if e.getCause.isInstanceOf [AccessControlException ] =>
541- // We don't have read permissions on the log file
542- logWarning(s " Unable to read log $path" , e.getCause)
543- blacklist(path)
544- // SPARK-28157 We should remove this blacklisted entry from the KVStore
545- // to handle permission-only changes with the same file sizes later.
546- listing.delete(classOf [LogInfo ], path.toString)
547- case e : Exception =>
548- logError(" Exception while merging application listings" , e)
549- } finally {
550- pendingReplayTasksCount.decrementAndGet()
551- }
552- }
553-
554549 // Delete all information about applications whose log files disappeared from storage.
555550 // This is done by identifying the event logs which were not touched by the current
556551 // directory scan.
@@ -562,7 +557,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
562557 .last(newLastScanTime - 1 )
563558 .asScala
564559 .toList
565- stale.foreach { log =>
560+ stale.filterNot(isProcessing). foreach { log =>
566561 log.appId.foreach { appId =>
567562 cleanAppData(appId, log.attemptId, log.logPath)
568563 listing.delete(classOf [LogInfo ], log.logPath)
@@ -663,10 +658,42 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
663658 }
664659 }
665660
661+ private def mergeApplicationListing (
662+ reader : EventLogFileReader ,
663+ scanTime : Long ,
664+ enableOptimizations : Boolean ): Unit = {
665+ try {
666+ pendingReplayTasksCount.incrementAndGet()
667+ doMergeApplicationListing(reader, scanTime, enableOptimizations)
668+ } catch {
669+ case e : InterruptedException =>
670+ throw e
671+ case e : AccessControlException =>
672+ // We don't have read permissions on the log file
673+ logWarning(s " Unable to read log ${reader.rootPath}" , e)
674+ blacklist(reader.rootPath)
675+ // SPARK-28157 We should remove this blacklisted entry from the KVStore
676+ // to handle permission-only changes with the same file sizes later.
677+ listing.delete(classOf [LogInfo ], reader.rootPath.toString)
678+ case e : Exception =>
679+ logError(" Exception while merging application listings" , e)
680+ } finally {
681+ endProcessing(reader.rootPath)
682+ pendingReplayTasksCount.decrementAndGet()
683+
684+ val isExpired = scanTime + conf.get(MAX_LOG_AGE_S ) * 1000 < clock.getTimeMillis()
685+ if (isExpired) {
686+ listing.delete(classOf [LogInfo ], reader.rootPath.toString)
687+ deleteLog(fs, reader.rootPath)
688+ }
689+ }
690+ }
691+
666692 /**
667693 * Replay the given log file, saving the application in the listing db.
694+ * Visable for testing
668695 */
669- protected def mergeApplicationListing (
696+ private [history] def doMergeApplicationListing (
670697 reader : EventLogFileReader ,
671698 scanTime : Long ,
672699 enableOptimizations : Boolean ): Unit = {
@@ -772,7 +799,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
772799 // mean the end event is before the configured threshold, so call the method again to
773800 // re-parse the whole log.
774801 logInfo(s " Reparsing $logPath since end event was not found. " )
775- mergeApplicationListing (reader, scanTime, enableOptimizations = false )
802+ doMergeApplicationListing (reader, scanTime, enableOptimizations = false )
776803
777804 case _ =>
778805 // If the app hasn't written down its app ID to the logs, still record the entry in the
@@ -826,7 +853,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
826853 .asScala
827854 .filter { l => l.logType == null || l.logType == LogType .EventLogs }
828855 .toList
829- stale.foreach { log =>
856+ stale.filterNot(isProcessing). foreach { log =>
830857 if (log.appId.isEmpty) {
831858 logInfo(s " Deleting invalid / corrupt event log ${log.logPath}" )
832859 deleteLog(fs, new Path (log.logPath))
@@ -934,7 +961,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
934961 .asScala
935962 .filter { l => l.logType != null && l.logType == LogType .DriverLogs }
936963 .toList
937- stale.foreach { log =>
964+ stale.filterNot(isProcessing). foreach { log =>
938965 logInfo(s " Deleting invalid driver log ${log.logPath}" )
939966 listing.delete(classOf [LogInfo ], log.logPath)
940967 deleteLog(driverLogFs, new Path (log.logPath))
0 commit comments