@@ -25,7 +25,6 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
2525
2626import scala .collection .JavaConverters ._
2727import scala .collection .mutable
28- import scala .concurrent .ExecutionException
2928import scala .io .Source
3029import scala .util .Try
3130import scala .xml .Node
@@ -161,6 +160,26 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
161160 new HistoryServerDiskManager (conf, path, listing, clock)
162161 }
163162
163+ // Used to store the paths, which are being processed. This enable the replay log tasks execute
164+ // asynchronously and make sure that checkForLogs would not process a path repeatedly.
165+ private val processing = ConcurrentHashMap .newKeySet[String ]
166+
167+ private def isProcessing (path : Path ): Boolean = {
168+ processing.contains(path.getName)
169+ }
170+
171+ private def isProcessing (info : LogInfo ): Boolean = {
172+ processing.contains(info.logPath.split(" /" ).last)
173+ }
174+
175+ private def processing (path : Path ): Unit = {
176+ processing.add(path.getName)
177+ }
178+
179+ private def endProcessing (path : Path ): Unit = {
180+ processing.remove(path.getName)
181+ }
182+
164183 private val blacklist = new ConcurrentHashMap [String , Long ]
165184
166185 // Visible for testing
@@ -440,6 +459,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
440459
441460 val updated = Option (fs.listStatus(new Path (logDir))).map(_.toSeq).getOrElse(Nil )
442461 .filter { entry => ! isBlacklisted(entry.getPath) }
462+ .filter { entry => ! isProcessing(entry.getPath) }
443463 .flatMap { entry => EventLogFileReader (fs, entry) }
444464 .filter { reader =>
445465 try {
@@ -512,8 +532,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
512532 logDebug(s " New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}" )
513533 }
514534
515- val tasks = updated.flatMap { entry =>
535+ updated.flatMap { entry =>
516536 try {
537+ processing(entry.rootPath)
517538 val task : Future [Unit ] = replayExecutor.submit(
518539 () => mergeApplicationListing(entry, newLastScanTime, true ))
519540 Some (task -> entry.rootPath)
@@ -527,31 +548,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
527548 }
528549 }
529550
530- pendingReplayTasksCount.addAndGet(tasks.size)
531-
532- // Wait for all tasks to finish. This makes sure that checkForLogs
533- // is not scheduled again while some tasks are already running in
534- // the replayExecutor.
535- tasks.foreach { case (task, path) =>
536- try {
537- task.get()
538- } catch {
539- case e : InterruptedException =>
540- throw e
541- case e : ExecutionException if e.getCause.isInstanceOf [AccessControlException ] =>
542- // We don't have read permissions on the log file
543- logWarning(s " Unable to read log $path" , e.getCause)
544- blacklist(path)
545- // SPARK-28157 We should remove this blacklisted entry from the KVStore
546- // to handle permission-only changes with the same file sizes later.
547- listing.delete(classOf [LogInfo ], path.toString)
548- case e : Exception =>
549- logError(" Exception while merging application listings" , e)
550- } finally {
551- pendingReplayTasksCount.decrementAndGet()
552- }
553- }
554-
555551 // Delete all information about applications whose log files disappeared from storage.
556552 // This is done by identifying the event logs which were not touched by the current
557553 // directory scan.
@@ -563,7 +559,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
563559 .last(newLastScanTime - 1 )
564560 .asScala
565561 .toList
566- stale.foreach { log =>
562+ stale.filterNot(isProcessing). foreach { log =>
567563 log.appId.foreach { appId =>
568564 cleanAppData(appId, log.attemptId, log.logPath)
569565 listing.delete(classOf [LogInfo ], log.logPath)
@@ -664,10 +660,36 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
664660 }
665661 }
666662
663+ private def mergeApplicationListing (
664+ reader : EventLogFileReader ,
665+ scanTime : Long ,
666+ enableOptimizations : Boolean ): Unit = {
667+ try {
668+ pendingReplayTasksCount.incrementAndGet()
669+ doMergeApplicationListing(reader, scanTime, enableOptimizations)
670+ } catch {
671+ case e : InterruptedException =>
672+ throw e
673+ case e : AccessControlException =>
674+ // We don't have read permissions on the log file
675+ logWarning(s " Unable to read log ${reader.rootPath}" , e)
676+ blacklist(reader.rootPath)
677+ // SPARK-28157 We should remove this blacklisted entry from the KVStore
678+ // to handle permission-only changes with the same file sizes later.
679+ listing.delete(classOf [LogInfo ], reader.rootPath.toString)
680+ case e : Exception =>
681+ logError(" Exception while merging application listings" , e)
682+ } finally {
683+ endProcessing(reader.rootPath)
684+ pendingReplayTasksCount.decrementAndGet()
685+ }
686+ }
687+
667688 /**
668689 * Replay the given log file, saving the application in the listing db.
690+ * Visable for testing
669691 */
670- protected def mergeApplicationListing (
692+ private [history] def doMergeApplicationListing (
671693 reader : EventLogFileReader ,
672694 scanTime : Long ,
673695 enableOptimizations : Boolean ): Unit = {
@@ -773,7 +795,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
773795 // mean the end event is before the configured threshold, so call the method again to
774796 // re-parse the whole log.
775797 logInfo(s " Reparsing $logPath since end event was not found. " )
776- mergeApplicationListing (reader, scanTime, enableOptimizations = false )
798+ doMergeApplicationListing (reader, scanTime, enableOptimizations = false )
777799
778800 case _ =>
779801 // If the app hasn't written down its app ID to the logs, still record the entry in the
@@ -827,7 +849,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
827849 .asScala
828850 .filter { l => l.logType == null || l.logType == LogType .EventLogs }
829851 .toList
830- stale.foreach { log =>
852+ stale.filterNot(isProcessing). foreach { log =>
831853 if (log.appId.isEmpty) {
832854 logInfo(s " Deleting invalid / corrupt event log ${log.logPath}" )
833855 deleteLog(fs, new Path (log.logPath))
@@ -935,7 +957,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
935957 .asScala
936958 .filter { l => l.logType != null && l.logType == LogType .DriverLogs }
937959 .toList
938- stale.foreach { log =>
960+ stale.filterNot(isProcessing). foreach { log =>
939961 logInfo(s " Deleting invalid driver log ${log.logPath}" )
940962 listing.delete(classOf [LogInfo ], log.logPath)
941963 deleteLog(driverLogFs, new Path (log.logPath))
0 commit comments