diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f2ee5994a8f7..5f9b18ce0127 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -805,6 +805,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) */ private[history] def cleanLogs(): Unit = Utils.tryLog { val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 + val maxNum = conf.get(MAX_LOG_NUM) val expired = listing.view(classOf[ApplicationInfoWrapper]) .index("oldestAttempt") @@ -817,23 +818,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val (remaining, toDelete) = app.attempts.partition { attempt => attempt.info.lastUpdated.getTime() >= maxTime } - - if (remaining.nonEmpty) { - val newApp = new ApplicationInfoWrapper(app.info, remaining) - listing.write(newApp) - } - - toDelete.foreach { attempt => - logInfo(s"Deleting expired event log for ${attempt.logPath}") - val logPath = new Path(logDir, attempt.logPath) - listing.delete(classOf[LogInfo], logPath.toString()) - cleanAppData(app.id, attempt.info.attemptId, logPath.toString()) - deleteLog(fs, logPath) - } - - if (remaining.isEmpty) { - listing.delete(app.getClass(), app.id) - } + deleteAttemptLogs(app, remaining, toDelete) } // Delete log files that don't have a valid application and exceed the configured max age. @@ -851,10 +836,59 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.delete(classOf[LogInfo], log.logPath) } } + + // If the number of files is bigger than MAX_LOG_NUM, + // clean up all completed attempts per application one by one. + val num = listing.view(classOf[LogInfo]).index("lastProcessed").asScala.size + var count = num - maxNum + if (count > 0) { + logInfo(s"Try to delete $count old event logs to keep $maxNum logs in total.") + val oldAttempts = listing.view(classOf[ApplicationInfoWrapper]) + .index("oldestAttempt") + .asScala + oldAttempts.foreach { app => + if (count > 0) { + // Applications may have multiple attempts, some of which may not be completed yet. + val (toDelete, remaining) = app.attempts.partition(_.info.completed) + count -= deleteAttemptLogs(app, remaining, toDelete) + } + } + if (count > 0) { + logWarning(s"Fail to clean up according to MAX_LOG_NUM policy ($maxNum).") + } + } + // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + private def deleteAttemptLogs( + app: ApplicationInfoWrapper, + remaining: List[AttemptInfoWrapper], + toDelete: List[AttemptInfoWrapper]): Int = { + if (remaining.nonEmpty) { + val newApp = new ApplicationInfoWrapper(app.info, remaining) + listing.write(newApp) + } + + var countDeleted = 0 + toDelete.foreach { attempt => + logInfo(s"Deleting expired event log for ${attempt.logPath}") + val logPath = new Path(logDir, attempt.logPath) + listing.delete(classOf[LogInfo], logPath.toString()) + cleanAppData(app.id, attempt.info.attemptId, logPath.toString()) + if (deleteLog(fs, logPath)) { + countDeleted += 1 + } + } + + if (remaining.isEmpty) { + listing.delete(app.getClass(), app.id) + } + + countDeleted + } + /** * Delete driver logs from the configured spark dfs dir that exceed the configured max age */ @@ -1066,12 +1100,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId.")) } - private def deleteLog(fs: FileSystem, log: Path): Unit = { + private def deleteLog(fs: FileSystem, log: Path): Boolean = { + var deleted = false if (isBlacklisted(log)) { logDebug(s"Skipping deleting $log as we don't have permissions on it.") } else { try { - fs.delete(log, true) + deleted = fs.delete(log, true) } catch { case _: AccessControlException => logInfo(s"No permission to delete $log, ignoring.") @@ -1079,6 +1114,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logError(s"IOException in cleaning $log", ioe) } } + deleted } private def isCompleted(name: String): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 1d73f01cb84d..ca9af316dffd 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -49,6 +49,11 @@ private[spark] object History { .timeConf(TimeUnit.SECONDS) .createWithDefaultString("7d") + val MAX_LOG_NUM = ConfigBuilder("spark.history.fs.cleaner.maxNum") + .doc("The maximum number of log files in the event log directory.") + .intConf + .createWithDefault(Int.MaxValue) + val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") .doc("Local directory where to cache application history information. By default this is " + "not set, meaning all history information will be kept in memory.") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 571c6e3e579b..aaf068e81db0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1185,6 +1185,56 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus)) } + test("log cleaner with the maximum number of log files") { + val clock = new ManualClock(0) + (5 to 0 by -1).foreach { num => + val log1_1 = newLogFile("app1", Some("attempt1"), inProgress = false) + writeFile(log1_1, true, None, + SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")), + SparkListenerApplicationEnd(2L) + ) + log1_1.setLastModified(2L) + + val log2_1 = newLogFile("app2", Some("attempt1"), inProgress = false) + writeFile(log2_1, true, None, + SparkListenerApplicationStart("app2", Some("app2"), 3L, "test", Some("attempt1")), + SparkListenerApplicationEnd(4L) + ) + log2_1.setLastModified(4L) + + val log3_1 = newLogFile("app3", Some("attempt1"), inProgress = false) + writeFile(log3_1, true, None, + SparkListenerApplicationStart("app3", Some("app3"), 5L, "test", Some("attempt1")), + SparkListenerApplicationEnd(6L) + ) + log3_1.setLastModified(6L) + + val log1_2_incomplete = newLogFile("app1", Some("attempt2"), inProgress = false) + writeFile(log1_2_incomplete, true, None, + SparkListenerApplicationStart("app1", Some("app1"), 7L, "test", Some("attempt2")) + ) + log1_2_incomplete.setLastModified(8L) + + val log3_2 = newLogFile("app3", Some("attempt2"), inProgress = false) + writeFile(log3_2, true, None, + SparkListenerApplicationStart("app3", Some("app3"), 9L, "test", Some("attempt2")), + SparkListenerApplicationEnd(10L) + ) + log3_2.setLastModified(10L) + + val provider = new FsHistoryProvider(createTestConf().set(MAX_LOG_NUM.key, s"$num"), clock) + updateAndCheck(provider) { list => + assert(log1_1.exists() == (num > 4)) + assert(log1_2_incomplete.exists()) // Always exists for all configurations + + assert(log2_1.exists() == (num > 3)) + + assert(log3_1.exists() == (num > 2)) + assert(log3_2.exists() == (num > 2)) + } + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: diff --git a/docs/monitoring.md b/docs/monitoring.md index 4017677861a7..0f7210c3b8bb 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -190,7 +190,11 @@ Security options for the Spark History Server are covered more detail in the
spark.history.fs.cleaner.maxAge
+ Files are deleted if at least one of two conditions holds.
+ First, they're deleted if they're older than spark.history.fs.cleaner.maxAge.
+ They are also deleted if the number of files is more than
+ spark.history.fs.cleaner.maxNum, Spark tries to clean up the completed attempts
+ from the applications based on the order of their oldest attempt time.