@@ -20,13 +20,14 @@ package org.apache.spark.deploy.history
2020import java .io .{File , FileNotFoundException , IOException }
2121import java .lang .{Long => JLong }
2222import java .nio .file .Files
23- import java .util .{Date , ServiceLoader }
23+ import java .util .{Date , NoSuchElementException , ServiceLoader }
2424import java .util .concurrent .{ConcurrentHashMap , ExecutorService , Future , TimeUnit }
2525import java .util .zip .ZipOutputStream
2626
2727import scala .collection .JavaConverters ._
2828import scala .collection .mutable
2929import scala .io .Source
30+ import scala .util .{Failure , Success , Try }
3031import scala .xml .Node
3132
3233import com .fasterxml .jackson .annotation .JsonIgnore
@@ -680,12 +681,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
680681 } finally {
681682 endProcessing(reader.rootPath)
682683 pendingReplayTasksCount.decrementAndGet()
683-
684- val isExpired = scanTime + conf.get(MAX_LOG_AGE_S ) * 1000 < clock.getTimeMillis()
685- if (isExpired) {
686- listing.delete(classOf [LogInfo ], reader.rootPath.toString)
687- deleteLog(fs, reader.rootPath)
688- }
684+ checkAndCleanLog(reader.rootPath.toString)
689685 }
690686 }
691687
@@ -824,6 +820,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
824820 }
825821 }
826822
823+ /**
824+ * Check and delete specified event log according to the max log age defined by the user.
825+ */
826+ private def checkAndCleanLog (logPath : String ): Unit = Utils .tryLog {
827+ val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S ) * 1000
828+ val expiredLog = Try {
829+ val log = listing.read(classOf [LogInfo ], logPath)
830+ if (log.lastProcessed < maxTime) Some (log) else None
831+ } match {
832+ case Success (log) => log
833+ case Failure (_ : NoSuchElementException ) => None
834+ case Failure (e) => throw e
835+ }
836+
837+ expiredLog.foreach { log =>
838+ log.appId.foreach { appId =>
839+ listing.view(classOf [ApplicationInfoWrapper ])
840+ .index(" oldestAttempt" )
841+ .reverse()
842+ .first(maxTime)
843+ .asScala
844+ .filter(_.info.id == appId)
845+ .foreach { app =>
846+ val (remaining, toDelete) = app.attempts.partition { attempt =>
847+ attempt.info.lastUpdated.getTime() >= maxTime
848+ }
849+ deleteAttemptLogs(app, remaining, toDelete)
850+ }
851+ }
852+ if (log.appId.isEmpty) {
853+ logInfo(s " Deleting invalid / corrupt event log ${log.logPath}" )
854+ deleteLog(fs, new Path (log.logPath))
855+ listing.delete(classOf [LogInfo ], log.logPath)
856+ }
857+ }
858+ }
859+
827860 /**
828861 * Delete event logs from the log directory according to the clean policy defined by the user.
829862 */
0 commit comments