@@ -20,13 +20,12 @@ package org.apache.spark.deploy.history
2020import java .io .{File , FileNotFoundException , IOException }
2121import java .lang .{Long => JLong }
2222import java .nio .file .Files
23- import java .util .{Date , ServiceLoader }
23+ import java .util .{Date , NoSuchElementException , ServiceLoader }
2424import java .util .concurrent .{ConcurrentHashMap , ExecutorService , Future , TimeUnit }
2525import java .util .zip .ZipOutputStream
2626
2727import scala .collection .JavaConverters ._
2828import scala .collection .mutable
29- import scala .concurrent .ExecutionException
3029import scala .io .Source
3130import scala .xml .Node
3231
@@ -160,6 +159,26 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
160159 new HistoryServerDiskManager (conf, path, listing, clock)
161160 }
162161
162+ // Used to store the paths, which are being processed. This enable the replay log tasks execute
163+ // asynchronously and make sure that checkForLogs would not process a path repeatedly.
164+ private val processing = ConcurrentHashMap .newKeySet[String ]
165+
166+ private def isProcessing (path : Path ): Boolean = {
167+ processing.contains(path.getName)
168+ }
169+
170+ private def isProcessing (info : LogInfo ): Boolean = {
171+ processing.contains(info.logPath.split(" /" ).last)
172+ }
173+
174+ private def processing (path : Path ): Unit = {
175+ processing.add(path.getName)
176+ }
177+
178+ private def endProcessing (path : Path ): Unit = {
179+ processing.remove(path.getName)
180+ }
181+
163182 private val blacklist = new ConcurrentHashMap [String , Long ]
164183
165184 // Visible for testing
@@ -440,6 +459,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
440459
441460 val updated = Option (fs.listStatus(new Path (logDir))).map(_.toSeq).getOrElse(Nil )
442461 .filter { entry => ! isBlacklisted(entry.getPath) }
462+ .filter { entry => ! isProcessing(entry.getPath) }
443463 .flatMap { entry => EventLogFileReader (fs, entry) }
444464 .filter { reader =>
445465 try {
@@ -512,43 +532,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
512532 logDebug(s " New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}" )
513533 }
514534
515- val tasks = updated.flatMap { entry =>
535+ updated.foreach { entry =>
536+ processing(entry.rootPath)
516537 try {
517- val task : Future [Unit ] = replayExecutor.submit(
518- () => mergeApplicationListing(entry, newLastScanTime, true ))
519- Some (task -> entry.rootPath)
538+ val task : Runnable = () => mergeApplicationListing(entry, newLastScanTime, true )
539+ replayExecutor.submit(task)
520540 } catch {
521541 // let the iteration over the updated entries break, since an exception on
522542 // replayExecutor.submit (..) indicates the ExecutorService is unable
523543 // to take any more submissions at this time
524544 case e : Exception =>
525545 logError(s " Exception while submitting event log for replay " , e)
526- None
527- }
528- }
529-
530- pendingReplayTasksCount.addAndGet(tasks.size)
531-
532- // Wait for all tasks to finish. This makes sure that checkForLogs
533- // is not scheduled again while some tasks are already running in
534- // the replayExecutor.
535- tasks.foreach { case (task, path) =>
536- try {
537- task.get()
538- } catch {
539- case e : InterruptedException =>
540- throw e
541- case e : ExecutionException if e.getCause.isInstanceOf [AccessControlException ] =>
542- // We don't have read permissions on the log file
543- logWarning(s " Unable to read log $path" , e.getCause)
544- blacklist(path)
545- // SPARK-28157 We should remove this blacklisted entry from the KVStore
546- // to handle permission-only changes with the same file sizes later.
547- listing.delete(classOf [LogInfo ], path.toString)
548- case e : Exception =>
549- logError(" Exception while merging application listings" , e)
550- } finally {
551- pendingReplayTasksCount.decrementAndGet()
552546 }
553547 }
554548
@@ -563,7 +557,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
563557 .last(newLastScanTime - 1 )
564558 .asScala
565559 .toList
566- stale.foreach { log =>
560+ stale.filterNot(isProcessing). foreach { log =>
567561 log.appId.foreach { appId =>
568562 cleanAppData(appId, log.attemptId, log.logPath)
569563 listing.delete(classOf [LogInfo ], log.logPath)
@@ -664,10 +658,39 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
664658 }
665659 }
666660
661+ private def mergeApplicationListing (
662+ reader : EventLogFileReader ,
663+ scanTime : Long ,
664+ enableOptimizations : Boolean ): Unit = {
665+ try {
666+ pendingReplayTasksCount.incrementAndGet()
667+ doMergeApplicationListing(reader, scanTime, enableOptimizations)
668+ if (conf.get(CLEANER_ENABLED )) {
669+ checkAndCleanLog(reader.rootPath.toString)
670+ }
671+ } catch {
672+ case e : InterruptedException =>
673+ throw e
674+ case e : AccessControlException =>
675+ // We don't have read permissions on the log file
676+ logWarning(s " Unable to read log ${reader.rootPath}" , e)
677+ blacklist(reader.rootPath)
678+ // SPARK-28157 We should remove this blacklisted entry from the KVStore
679+ // to handle permission-only changes with the same file sizes later.
680+ listing.delete(classOf [LogInfo ], reader.rootPath.toString)
681+ case e : Exception =>
682+ logError(" Exception while merging application listings" , e)
683+ } finally {
684+ endProcessing(reader.rootPath)
685+ pendingReplayTasksCount.decrementAndGet()
686+ }
687+ }
688+
667689 /**
668690 * Replay the given log file, saving the application in the listing db.
691+ * Visable for testing
669692 */
670- protected def mergeApplicationListing (
693+ private [history] def doMergeApplicationListing (
671694 reader : EventLogFileReader ,
672695 scanTime : Long ,
673696 enableOptimizations : Boolean ): Unit = {
@@ -773,7 +796,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
773796 // mean the end event is before the configured threshold, so call the method again to
774797 // re-parse the whole log.
775798 logInfo(s " Reparsing $logPath since end event was not found. " )
776- mergeApplicationListing (reader, scanTime, enableOptimizations = false )
799+ doMergeApplicationListing (reader, scanTime, enableOptimizations = false )
777800
778801 case _ =>
779802 // If the app hasn't written down its app ID to the logs, still record the entry in the
@@ -798,6 +821,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
798821 }
799822 }
800823
824+ /**
825+ * Check and delete specified event log according to the max log age defined by the user.
826+ */
827+ private [history] def checkAndCleanLog (logPath : String ): Unit = Utils .tryLog {
828+ val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S ) * 1000
829+ val log = listing.read(classOf [LogInfo ], logPath)
830+
831+ if (log.lastProcessed <= maxTime && log.appId.isEmpty) {
832+ logInfo(s " Deleting invalid / corrupt event log ${log.logPath}" )
833+ deleteLog(fs, new Path (log.logPath))
834+ listing.delete(classOf [LogInfo ], log.logPath)
835+ }
836+
837+ log.appId.foreach { appId =>
838+ val app = listing.read(classOf [ApplicationInfoWrapper ], appId)
839+ if (app.oldestAttempt() <= maxTime) {
840+ val (remaining, toDelete) = app.attempts.partition { attempt =>
841+ attempt.info.lastUpdated.getTime() >= maxTime
842+ }
843+ deleteAttemptLogs(app, remaining, toDelete)
844+ }
845+ }
846+ }
847+
801848 /**
802849 * Delete event logs from the log directory according to the clean policy defined by the user.
803850 */
@@ -827,7 +874,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
827874 .asScala
828875 .filter { l => l.logType == null || l.logType == LogType .EventLogs }
829876 .toList
830- stale.foreach { log =>
877+ stale.filterNot(isProcessing). foreach { log =>
831878 if (log.appId.isEmpty) {
832879 logInfo(s " Deleting invalid / corrupt event log ${log.logPath}" )
833880 deleteLog(fs, new Path (log.logPath))
@@ -935,7 +982,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
935982 .asScala
936983 .filter { l => l.logType != null && l.logType == LogType .DriverLogs }
937984 .toList
938- stale.foreach { log =>
985+ stale.filterNot(isProcessing). foreach { log =>
939986 logInfo(s " Deleting invalid driver log ${log.logPath}" )
940987 listing.delete(classOf [LogInfo ], log.logPath)
941988 deleteLog(driverLogFs, new Path (log.logPath))
0 commit comments