Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
private[history] def cleanLogs(): Unit = Utils.tryLog {
val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
val maxNum = conf.get(MAX_LOG_NUM)

val expired = listing.view(classOf[ApplicationInfoWrapper])
.index("oldestAttempt")
Expand All @@ -817,23 +818,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val (remaining, toDelete) = app.attempts.partition { attempt =>
attempt.info.lastUpdated.getTime() >= maxTime
}

if (remaining.nonEmpty) {
val newApp = new ApplicationInfoWrapper(app.info, remaining)
listing.write(newApp)
}

toDelete.foreach { attempt =>
logInfo(s"Deleting expired event log for ${attempt.logPath}")
val logPath = new Path(logDir, attempt.logPath)
listing.delete(classOf[LogInfo], logPath.toString())
cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
deleteLog(fs, logPath)
}

if (remaining.isEmpty) {
listing.delete(app.getClass(), app.id)
}
deleteAttemptLogs(app, remaining, toDelete)
}

// Delete log files that don't have a valid application and exceed the configured max age.
Expand All @@ -851,10 +836,59 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.delete(classOf[LogInfo], log.logPath)
}
}

// If the number of files is bigger than MAX_LOG_NUM,
// clean up all completed attempts per application one by one.
val num = listing.view(classOf[LogInfo]).index("lastProcessed").asScala.size
var count = num - maxNum
if (count > 0) {
logInfo(s"Try to delete $count old event logs to keep $maxNum logs in total.")
val oldAttempts = listing.view(classOf[ApplicationInfoWrapper])
.index("oldestAttempt")
.asScala
oldAttempts.foreach { app =>
if (count > 0) {
// Applications may have multiple attempts, some of which may not be completed yet.
val (toDelete, remaining) = app.attempts.partition(_.info.completed)
count -= deleteAttemptLogs(app, remaining, toDelete)
}
}
if (count > 0) {
logWarning(s"Fail to clean up according to MAX_LOG_NUM policy ($maxNum).")
}
}

// Clean the blacklist from the expired entries.
clearBlacklist(CLEAN_INTERVAL_S)
}

private def deleteAttemptLogs(
app: ApplicationInfoWrapper,
remaining: List[AttemptInfoWrapper],
toDelete: List[AttemptInfoWrapper]): Int = {
if (remaining.nonEmpty) {
val newApp = new ApplicationInfoWrapper(app.info, remaining)
listing.write(newApp)
}

var countDeleted = 0
toDelete.foreach { attempt =>
logInfo(s"Deleting expired event log for ${attempt.logPath}")
val logPath = new Path(logDir, attempt.logPath)
listing.delete(classOf[LogInfo], logPath.toString())
cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
if (deleteLog(fs, logPath)) {
countDeleted += 1
}
}

if (remaining.isEmpty) {
listing.delete(app.getClass(), app.id)
}

countDeleted
}

/**
* Delete driver logs from the configured spark dfs dir that exceed the configured max age
*/
Expand Down Expand Up @@ -1066,19 +1100,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId."))
}

private def deleteLog(fs: FileSystem, log: Path): Unit = {
private def deleteLog(fs: FileSystem, log: Path): Boolean = {
var deleted = false
if (isBlacklisted(log)) {
logDebug(s"Skipping deleting $log as we don't have permissions on it.")
} else {
try {
fs.delete(log, true)
deleted = fs.delete(log, true)
} catch {
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
}
}
deleted
}

private def isCompleted(name: String): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ private[spark] object History {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("7d")

val MAX_LOG_NUM = ConfigBuilder("spark.history.fs.cleaner.maxNum")
.doc("The maximum number of log files in the event log directory.")
.intConf
.createWithDefault(Int.MaxValue)

val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path")
.doc("Local directory where to cache application history information. By default this is " +
"not set, meaning all history information will be kept in memory.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,56 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus))
}

test("log cleaner with the maximum number of log files") {
val clock = new ManualClock(0)
(5 to 0 by -1).foreach { num =>
val log1_1 = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log1_1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
SparkListenerApplicationEnd(2L)
)
log1_1.setLastModified(2L)

val log2_1 = newLogFile("app2", Some("attempt1"), inProgress = false)
writeFile(log2_1, true, None,
SparkListenerApplicationStart("app2", Some("app2"), 3L, "test", Some("attempt1")),
SparkListenerApplicationEnd(4L)
)
log2_1.setLastModified(4L)

val log3_1 = newLogFile("app3", Some("attempt1"), inProgress = false)
writeFile(log3_1, true, None,
SparkListenerApplicationStart("app3", Some("app3"), 5L, "test", Some("attempt1")),
SparkListenerApplicationEnd(6L)
)
log3_1.setLastModified(6L)

val log1_2_incomplete = newLogFile("app1", Some("attempt2"), inProgress = false)
writeFile(log1_2_incomplete, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 7L, "test", Some("attempt2"))
)
log1_2_incomplete.setLastModified(8L)

val log3_2 = newLogFile("app3", Some("attempt2"), inProgress = false)
writeFile(log3_2, true, None,
SparkListenerApplicationStart("app3", Some("app3"), 9L, "test", Some("attempt2")),
SparkListenerApplicationEnd(10L)
)
log3_2.setLastModified(10L)

val provider = new FsHistoryProvider(createTestConf().set(MAX_LOG_NUM.key, s"$num"), clock)
updateAndCheck(provider) { list =>
assert(log1_1.exists() == (num > 4))
assert(log1_2_incomplete.exists()) // Always exists for all configurations

assert(log2_1.exists() == (num > 3))

assert(log3_1.exists() == (num > 2))
assert(log3_2.exists() == (num > 2))
}
}
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down
16 changes: 15 additions & 1 deletion docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,11 @@ Security options for the Spark History Server are covered more detail in the
<td>1d</td>
<td>
How often the filesystem job history cleaner checks for files to delete.
Files are only deleted if they are older than <code>spark.history.fs.cleaner.maxAge</code>
Files are deleted if at least one of two conditions holds.
First, they're deleted if they're older than <code>spark.history.fs.cleaner.maxAge</code>.
They are also deleted if the number of files is more than
<code>spark.history.fs.cleaner.maxNum</code>, Spark tries to clean up the completed attempts
from the applications based on the order of their oldest attempt time.
</td>
</tr>
<tr>
Expand All @@ -200,6 +204,16 @@ Security options for the Spark History Server are covered more detail in the
Job history files older than this will be deleted when the filesystem history cleaner runs.
</td>
</tr>
<tr>
<td>spark.history.fs.cleaner.maxNum</td>
<td>Int.MaxValue</td>
<td>
The maximum number of files in the event log directory.
Spark tries to clean up the completed attempt logs to maintain the log directory under this limit.
This should be smaller than the underlying file system limit like
`dfs.namenode.fs-limits.max-directory-items` in HDFS.
</td>
</tr>
<tr>
<td>spark.history.fs.endEventReparseChunkSize</td>
<td>1m</td>
Expand Down