From b6cd802f58c90166f69d3880c153674fd55fd18b Mon Sep 17 00:00:00 2001 From: turbofei Date: Wed, 6 Nov 2019 22:06:48 +0800 Subject: [PATCH 1/7] [SPARK-29043] Improve the concurrent performance of History Server --- .../deploy/history/FsHistoryProvider.scala | 97 ++++++++++++------- .../history/FsHistoryProviderSuite.scala | 10 +- 2 files changed, 67 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a3776b3ad756..2b20f62f3a64 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -26,7 +26,6 @@ import java.util.zip.ZipOutputStream import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.concurrent.ExecutionException import scala.io.Source import scala.xml.Node @@ -160,6 +159,26 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) new HistoryServerDiskManager(conf, path, listing, clock) } + // Used to store the paths, which are being processed. This enable the replay log tasks execute + // asynchronously and make sure that checkForLogs would not process a path repeatedly. + private val processing = ConcurrentHashMap.newKeySet[String] + + private def isProcessing(path: Path): Boolean = { + processing.contains(path.getName) + } + + private def isProcessing(info: LogInfo): Boolean = { + processing.contains(info.logPath.split("/").last) + } + + private def processing(path: Path): Unit = { + processing.add(path.getName) + } + + private def endProcessing(path: Path): Unit = { + processing.remove(path.getName) + } + private val blacklist = new ConcurrentHashMap[String, Long] // Visible for testing @@ -440,6 +459,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) .filter { entry => !isBlacklisted(entry.getPath) } + .filter { entry => !isProcessing(entry.getPath) } .flatMap { entry => EventLogFileReader(fs, entry) } .filter { reader => try { @@ -512,11 +532,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}") } - val tasks = updated.flatMap { entry => + updated.foreach { entry => + processing(entry.rootPath) try { - val task: Future[Unit] = replayExecutor.submit( - () => mergeApplicationListing(entry, newLastScanTime, true)) - Some(task -> entry.rootPath) + val task: Runnable = () => mergeApplicationListing(entry, newLastScanTime, true) + replayExecutor.submit(task) } catch { // let the iteration over the updated entries break, since an exception on // replayExecutor.submit (..) indicates the ExecutorService is unable @@ -527,31 +547,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - pendingReplayTasksCount.addAndGet(tasks.size) - - // Wait for all tasks to finish. This makes sure that checkForLogs - // is not scheduled again while some tasks are already running in - // the replayExecutor. - tasks.foreach { case (task, path) => - try { - task.get() - } catch { - case e: InterruptedException => - throw e - case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] => - // We don't have read permissions on the log file - logWarning(s"Unable to read log $path", e.getCause) - blacklist(path) - // SPARK-28157 We should remove this blacklisted entry from the KVStore - // to handle permission-only changes with the same file sizes later. - listing.delete(classOf[LogInfo], path.toString) - case e: Exception => - logError("Exception while merging application listings", e) - } finally { - pendingReplayTasksCount.decrementAndGet() - } - } - // Delete all information about applications whose log files disappeared from storage. // This is done by identifying the event logs which were not touched by the current // directory scan. @@ -563,7 +558,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .last(newLastScanTime - 1) .asScala .toList - stale.foreach { log => + stale.filterNot(isProcessing).foreach { log => log.appId.foreach { appId => cleanAppData(appId, log.attemptId, log.logPath) listing.delete(classOf[LogInfo], log.logPath) @@ -664,10 +659,42 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + private def mergeApplicationListing( + reader: EventLogFileReader, + scanTime: Long, + enableOptimizations: Boolean): Unit = { + try { + pendingReplayTasksCount.incrementAndGet() + doMergeApplicationListing(reader, scanTime, enableOptimizations) + } catch { + case e: InterruptedException => + throw e + case e: AccessControlException => + // We don't have read permissions on the log file + logWarning(s"Unable to read log ${reader.rootPath}", e) + blacklist(reader.rootPath) + // SPARK-28157 We should remove this blacklisted entry from the KVStore + // to handle permission-only changes with the same file sizes later. + listing.delete(classOf[LogInfo], reader.rootPath.toString) + case e: Exception => + logError("Exception while merging application listings", e) + } finally { + endProcessing(reader.rootPath) + pendingReplayTasksCount.decrementAndGet() + + val isExpired = scanTime + conf.get(MAX_LOG_AGE_S) * 1000 < clock.getTimeMillis() + if (isExpired) { + listing.delete(classOf[LogInfo], reader.rootPath.toString) + deleteLog(fs, reader.rootPath) + } + } + } + /** * Replay the given log file, saving the application in the listing db. + * Visable for testing */ - protected def mergeApplicationListing( + private[history] def doMergeApplicationListing( reader: EventLogFileReader, scanTime: Long, enableOptimizations: Boolean): Unit = { @@ -773,7 +800,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // mean the end event is before the configured threshold, so call the method again to // re-parse the whole log. logInfo(s"Reparsing $logPath since end event was not found.") - mergeApplicationListing(reader, scanTime, enableOptimizations = false) + doMergeApplicationListing(reader, scanTime, enableOptimizations = false) case _ => // If the app hasn't written down its app ID to the logs, still record the entry in the @@ -827,7 +854,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .asScala .filter { l => l.logType == null || l.logType == LogType.EventLogs } .toList - stale.foreach { log => + stale.filterNot(isProcessing).foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") deleteLog(fs, new Path(log.logPath)) @@ -935,7 +962,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .asScala .filter { l => l.logType != null && l.logType == LogType.DriverLogs } .toList - stale.foreach { log => + stale.filterNot(isProcessing).foreach { log => logInfo(s"Deleting invalid driver log ${log.logPath}") listing.delete(classOf[LogInfo], log.logPath) deleteLog(driverLogFs, new Path(log.logPath)) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index ed195dd44e91..b3f07a01bc73 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -160,13 +160,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { assume(!Utils.isWindows) class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) { - var mergeApplicationListingCall = 0 - override protected def mergeApplicationListing( + var doMergeApplicationListingCall = 0 + override private[history] def doMergeApplicationListing( reader: EventLogFileReader, lastSeen: Long, enableSkipToEnd: Boolean): Unit = { - super.mergeApplicationListing(reader, lastSeen, enableSkipToEnd) - mergeApplicationListingCall += 1 + super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd) + doMergeApplicationListingCall += 1 } } val provider = new TestFsHistoryProvider @@ -187,7 +187,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { list.size should be (1) } - provider.mergeApplicationListingCall should be (1) + provider.doMergeApplicationListingCall should be (1) } test("history file is renamed from inprogress to completed") { From 7621626dc5a1ccda80eeef7cb78b2811b8db0a9b Mon Sep 17 00:00:00 2001 From: turbofei Date: Thu, 7 Nov 2019 18:02:59 +0800 Subject: [PATCH 2/7] check and delete expired log --- .../deploy/history/FsHistoryProvider.scala | 47 ++++++++++++++++--- 1 file changed, 40 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 2b20f62f3a64..afe40f67d162 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -20,13 +20,14 @@ package org.apache.spark.deploy.history import java.io.{File, FileNotFoundException, IOException} import java.lang.{Long => JLong} import java.nio.file.Files -import java.util.{Date, ServiceLoader} +import java.util.{Date, NoSuchElementException, ServiceLoader} import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit} import java.util.zip.ZipOutputStream import scala.collection.JavaConverters._ import scala.collection.mutable import scala.io.Source +import scala.util.{Failure, Success, Try} import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore @@ -681,12 +682,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } finally { endProcessing(reader.rootPath) pendingReplayTasksCount.decrementAndGet() - - val isExpired = scanTime + conf.get(MAX_LOG_AGE_S) * 1000 < clock.getTimeMillis() - if (isExpired) { - listing.delete(classOf[LogInfo], reader.rootPath.toString) - deleteLog(fs, reader.rootPath) - } + checkAndCleanLog(reader.rootPath.toString) } } @@ -825,6 +821,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** + * Check and delete specified event log according to the max log age defined by the user. + */ + private def checkAndCleanLog(logPath: String): Unit = Utils.tryLog { + val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 + val expiredLog = Try { + val log = listing.read(classOf[LogInfo], logPath) + if (log.lastProcessed < maxTime) Some(log) else None + } match { + case Success(log) => log + case Failure(_: NoSuchElementException) => None + case Failure(e) => throw e + } + + expiredLog.foreach { log => + log.appId.foreach { appId => + listing.view(classOf[ApplicationInfoWrapper]) + .index("oldestAttempt") + .reverse() + .first(maxTime) + .asScala + .filter(_.info.id == appId) + .foreach { app => + val (remaining, toDelete) = app.attempts.partition { attempt => + attempt.info.lastUpdated.getTime() >= maxTime + } + deleteAttemptLogs(app, remaining, toDelete) + } + } + if (log.appId.isEmpty) { + logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") + deleteLog(fs, new Path(log.logPath)) + listing.delete(classOf[LogInfo], log.logPath) + } + } + } + /** * Delete event logs from the log directory according to the clean policy defined by the user. */ From 44e3c80ac61d1bff6704851cd142a738e140a4bc Mon Sep 17 00:00:00 2001 From: turbofei Date: Tue, 12 Nov 2019 18:53:00 +0800 Subject: [PATCH 3/7] remove used code --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index afe40f67d162..afb3c5e83092 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -544,7 +544,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // to take any more submissions at this time case e: Exception => logError(s"Exception while submitting event log for replay", e) - None } } From 8540c396d9df53e269ada1c93420905fe0e7583c Mon Sep 17 00:00:00 2001 From: turbofei Date: Fri, 13 Dec 2019 15:36:34 +0800 Subject: [PATCH 4/7] fix code --- .../deploy/history/FsHistoryProvider.scala | 45 +++++++------------ 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index afb3c5e83092..0a363df7c643 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -681,7 +681,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } finally { endProcessing(reader.rootPath) pendingReplayTasksCount.decrementAndGet() - checkAndCleanLog(reader.rootPath.toString) + if (conf.get(CLEANER_ENABLED)) { + checkAndCleanLog(reader.rootPath.toString) + } } } @@ -823,36 +825,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** * Check and delete specified event log according to the max log age defined by the user. */ - private def checkAndCleanLog(logPath: String): Unit = Utils.tryLog { + private[history] def checkAndCleanLog(logPath: String): Unit = Utils.tryLog { val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 - val expiredLog = Try { - val log = listing.read(classOf[LogInfo], logPath) - if (log.lastProcessed < maxTime) Some(log) else None - } match { - case Success(log) => log - case Failure(_: NoSuchElementException) => None - case Failure(e) => throw e + val log = listing.read(classOf[LogInfo], logPath) + + if (log.lastProcessed < maxTime && log.appId.isEmpty) { + logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") + deleteLog(fs, new Path(log.logPath)) + listing.delete(classOf[LogInfo], log.logPath) } - expiredLog.foreach { log => - log.appId.foreach { appId => - listing.view(classOf[ApplicationInfoWrapper]) - .index("oldestAttempt") - .reverse() - .first(maxTime) - .asScala - .filter(_.info.id == appId) - .foreach { app => - val (remaining, toDelete) = app.attempts.partition { attempt => - attempt.info.lastUpdated.getTime() >= maxTime - } - deleteAttemptLogs(app, remaining, toDelete) - } - } - if (log.appId.isEmpty) { - logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") - deleteLog(fs, new Path(log.logPath)) - listing.delete(classOf[LogInfo], log.logPath) + log.appId.foreach { appId => + val appInfo = listing.read(classOf[ApplicationInfoWrapper], appId) + if (appInfo.oldestAttempt() < maxTime) { + val (remaining, toDelete) = appInfo.attempts.partition { attempt => + attempt.info.lastUpdated.getTime() >= maxTime + } + deleteAttemptLogs(appInfo, remaining, toDelete) } } } From 34aeab8a5f001f8c05729ea63b1bf532addd52bd Mon Sep 17 00:00:00 2001 From: turbofei Date: Fri, 13 Dec 2019 18:58:07 +0800 Subject: [PATCH 5/7] add ut --- .../deploy/history/FsHistoryProvider.scala | 10 +++--- .../history/FsHistoryProviderSuite.scala | 34 +++++++++++++++++++ 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 0a363df7c643..a5811b6c5853 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -829,19 +829,19 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 val log = listing.read(classOf[LogInfo], logPath) - if (log.lastProcessed < maxTime && log.appId.isEmpty) { + if (log.lastProcessed <= maxTime && log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") deleteLog(fs, new Path(log.logPath)) listing.delete(classOf[LogInfo], log.logPath) } log.appId.foreach { appId => - val appInfo = listing.read(classOf[ApplicationInfoWrapper], appId) - if (appInfo.oldestAttempt() < maxTime) { - val (remaining, toDelete) = appInfo.attempts.partition { attempt => + val app = listing.read(classOf[ApplicationInfoWrapper], appId) + if (app.oldestAttempt() <= maxTime) { + val (remaining, toDelete) = app.attempts.partition { attempt => attempt.info.lastUpdated.getTime() >= maxTime } - deleteAttemptLogs(appInfo, remaining, toDelete) + deleteAttemptLogs(app, remaining, toDelete) } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index b3f07a01bc73..1819c21409be 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1321,6 +1321,40 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { assertSerDe(serializer, attemptInfoWithIndex) } + test("SPARK-29043: clean up specified event log") { + def getLogPath(logFile: File): String = { + val uri = logFile.toURI + uri.getScheme + File.pathSeparator + uri.getPath + } + + val clock = new ManualClock() + val conf = createTestConf().set(MAX_LOG_AGE_S.key, "2d") + val provider = new FsHistoryProvider(conf, clock) + + // create an invalid application log file + val nonValidLogFile = newLogFile("NonValidLogFile", None, inProgress = true) + nonValidLogFile.createNewFile() + writeFile(nonValidLogFile, None, + SparkListenerApplicationStart(nonValidLogFile.getName, None, 1L, "test", None)) + nonValidLogFile.setLastModified(clock.getTimeMillis()) + + // create a valid application log file + val validLogFile = newLogFile("validLogFile", None, inProgress = true) + validLogFile.createNewFile() + writeFile(validLogFile, None, + SparkListenerApplicationStart(validLogFile.getName, Some("local_123"), 1L, "test", None)) + validLogFile.setLastModified(clock.getTimeMillis()) + + provider.checkForLogs() + clock.advance(TimeUnit.DAYS.toMillis(2)) + provider.checkAndCleanLog(getLogPath(nonValidLogFile)) + assert(new File(testDir.toURI).listFiles().size === 1) + + clock.advance(1) + provider.checkAndCleanLog(getLogPath(validLogFile)) + assert(new File(testDir.toURI).listFiles().size === 0) + } + private def assertOptionAfterSerde(opt: Option[Long], expected: Option[Long]): Unit = { if (expected.isEmpty) { assert(opt.isEmpty) From c6ac35ef199184a116277c12ec27c5d55c3e1b74 Mon Sep 17 00:00:00 2001 From: turbofei Date: Fri, 13 Dec 2019 19:00:41 +0800 Subject: [PATCH 6/7] fix style --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a5811b6c5853..c65350a1765d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -27,7 +27,6 @@ import java.util.zip.ZipOutputStream import scala.collection.JavaConverters._ import scala.collection.mutable import scala.io.Source -import scala.util.{Failure, Success, Try} import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore From e9ebb6fd690c6e3a786b3285058c59850107386b Mon Sep 17 00:00:00 2001 From: turbofei Date: Sun, 15 Dec 2019 00:29:51 +0800 Subject: [PATCH 7/7] fix ut --- .../deploy/history/FsHistoryProvider.scala | 6 ++--- .../history/FsHistoryProviderSuite.scala | 23 ++++++++----------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index c65350a1765d..f560b7e9157b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -665,6 +665,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { pendingReplayTasksCount.incrementAndGet() doMergeApplicationListing(reader, scanTime, enableOptimizations) + if (conf.get(CLEANER_ENABLED)) { + checkAndCleanLog(reader.rootPath.toString) + } } catch { case e: InterruptedException => throw e @@ -680,9 +683,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } finally { endProcessing(reader.rootPath) pendingReplayTasksCount.decrementAndGet() - if (conf.get(CLEANER_ENABLED)) { - checkAndCleanLog(reader.rootPath.toString) - } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 1819c21409be..eed42c546dcf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1322,21 +1322,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } test("SPARK-29043: clean up specified event log") { - def getLogPath(logFile: File): String = { - val uri = logFile.toURI - uri.getScheme + File.pathSeparator + uri.getPath - } - val clock = new ManualClock() - val conf = createTestConf().set(MAX_LOG_AGE_S.key, "2d") + val conf = createTestConf().set(MAX_LOG_AGE_S.key, "0").set(CLEANER_ENABLED.key, "true") val provider = new FsHistoryProvider(conf, clock) // create an invalid application log file - val nonValidLogFile = newLogFile("NonValidLogFile", None, inProgress = true) - nonValidLogFile.createNewFile() - writeFile(nonValidLogFile, None, - SparkListenerApplicationStart(nonValidLogFile.getName, None, 1L, "test", None)) - nonValidLogFile.setLastModified(clock.getTimeMillis()) + val inValidLogFile = newLogFile("inValidLogFile", None, inProgress = true) + inValidLogFile.createNewFile() + writeFile(inValidLogFile, None, + SparkListenerApplicationStart(inValidLogFile.getName, None, 1L, "test", None)) + inValidLogFile.setLastModified(clock.getTimeMillis()) // create a valid application log file val validLogFile = newLogFile("validLogFile", None, inProgress = true) @@ -1346,12 +1341,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { validLogFile.setLastModified(clock.getTimeMillis()) provider.checkForLogs() - clock.advance(TimeUnit.DAYS.toMillis(2)) - provider.checkAndCleanLog(getLogPath(nonValidLogFile)) + // The invalid application log file would be cleaned by checkAndCleanLog(). assert(new File(testDir.toURI).listFiles().size === 1) clock.advance(1) - provider.checkAndCleanLog(getLogPath(validLogFile)) + // cleanLogs() would clean the valid application log file. + provider.cleanLogs() assert(new File(testDir.toURI).listFiles().size === 0) }