diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index a3776b3ad756..f560b7e9157b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -20,13 +20,12 @@ package org.apache.spark.deploy.history import java.io.{File, FileNotFoundException, IOException} import java.lang.{Long => JLong} import java.nio.file.Files -import java.util.{Date, ServiceLoader} +import java.util.{Date, NoSuchElementException, ServiceLoader} import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit} import java.util.zip.ZipOutputStream import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.concurrent.ExecutionException import scala.io.Source import scala.xml.Node @@ -160,6 +159,26 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) new HistoryServerDiskManager(conf, path, listing, clock) } + // Used to store the paths, which are being processed. This enable the replay log tasks execute + // asynchronously and make sure that checkForLogs would not process a path repeatedly. + private val processing = ConcurrentHashMap.newKeySet[String] + + private def isProcessing(path: Path): Boolean = { + processing.contains(path.getName) + } + + private def isProcessing(info: LogInfo): Boolean = { + processing.contains(info.logPath.split("/").last) + } + + private def processing(path: Path): Unit = { + processing.add(path.getName) + } + + private def endProcessing(path: Path): Unit = { + processing.remove(path.getName) + } + private val blacklist = new ConcurrentHashMap[String, Long] // Visible for testing @@ -440,6 +459,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) .filter { entry => !isBlacklisted(entry.getPath) } + .filter { entry => !isProcessing(entry.getPath) } .flatMap { entry => EventLogFileReader(fs, entry) } .filter { reader => try { @@ -512,43 +532,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}") } - val tasks = updated.flatMap { entry => + updated.foreach { entry => + processing(entry.rootPath) try { - val task: Future[Unit] = replayExecutor.submit( - () => mergeApplicationListing(entry, newLastScanTime, true)) - Some(task -> entry.rootPath) + val task: Runnable = () => mergeApplicationListing(entry, newLastScanTime, true) + replayExecutor.submit(task) } catch { // let the iteration over the updated entries break, since an exception on // replayExecutor.submit (..) indicates the ExecutorService is unable // to take any more submissions at this time case e: Exception => logError(s"Exception while submitting event log for replay", e) - None - } - } - - pendingReplayTasksCount.addAndGet(tasks.size) - - // Wait for all tasks to finish. This makes sure that checkForLogs - // is not scheduled again while some tasks are already running in - // the replayExecutor. - tasks.foreach { case (task, path) => - try { - task.get() - } catch { - case e: InterruptedException => - throw e - case e: ExecutionException if e.getCause.isInstanceOf[AccessControlException] => - // We don't have read permissions on the log file - logWarning(s"Unable to read log $path", e.getCause) - blacklist(path) - // SPARK-28157 We should remove this blacklisted entry from the KVStore - // to handle permission-only changes with the same file sizes later. - listing.delete(classOf[LogInfo], path.toString) - case e: Exception => - logError("Exception while merging application listings", e) - } finally { - pendingReplayTasksCount.decrementAndGet() } } @@ -563,7 +557,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .last(newLastScanTime - 1) .asScala .toList - stale.foreach { log => + stale.filterNot(isProcessing).foreach { log => log.appId.foreach { appId => cleanAppData(appId, log.attemptId, log.logPath) listing.delete(classOf[LogInfo], log.logPath) @@ -664,10 +658,39 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + private def mergeApplicationListing( + reader: EventLogFileReader, + scanTime: Long, + enableOptimizations: Boolean): Unit = { + try { + pendingReplayTasksCount.incrementAndGet() + doMergeApplicationListing(reader, scanTime, enableOptimizations) + if (conf.get(CLEANER_ENABLED)) { + checkAndCleanLog(reader.rootPath.toString) + } + } catch { + case e: InterruptedException => + throw e + case e: AccessControlException => + // We don't have read permissions on the log file + logWarning(s"Unable to read log ${reader.rootPath}", e) + blacklist(reader.rootPath) + // SPARK-28157 We should remove this blacklisted entry from the KVStore + // to handle permission-only changes with the same file sizes later. + listing.delete(classOf[LogInfo], reader.rootPath.toString) + case e: Exception => + logError("Exception while merging application listings", e) + } finally { + endProcessing(reader.rootPath) + pendingReplayTasksCount.decrementAndGet() + } + } + /** * Replay the given log file, saving the application in the listing db. + * Visable for testing */ - protected def mergeApplicationListing( + private[history] def doMergeApplicationListing( reader: EventLogFileReader, scanTime: Long, enableOptimizations: Boolean): Unit = { @@ -773,7 +796,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // mean the end event is before the configured threshold, so call the method again to // re-parse the whole log. logInfo(s"Reparsing $logPath since end event was not found.") - mergeApplicationListing(reader, scanTime, enableOptimizations = false) + doMergeApplicationListing(reader, scanTime, enableOptimizations = false) case _ => // If the app hasn't written down its app ID to the logs, still record the entry in the @@ -798,6 +821,30 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** + * Check and delete specified event log according to the max log age defined by the user. + */ + private[history] def checkAndCleanLog(logPath: String): Unit = Utils.tryLog { + val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 + val log = listing.read(classOf[LogInfo], logPath) + + if (log.lastProcessed <= maxTime && log.appId.isEmpty) { + logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") + deleteLog(fs, new Path(log.logPath)) + listing.delete(classOf[LogInfo], log.logPath) + } + + log.appId.foreach { appId => + val app = listing.read(classOf[ApplicationInfoWrapper], appId) + if (app.oldestAttempt() <= maxTime) { + val (remaining, toDelete) = app.attempts.partition { attempt => + attempt.info.lastUpdated.getTime() >= maxTime + } + deleteAttemptLogs(app, remaining, toDelete) + } + } + } + /** * Delete event logs from the log directory according to the clean policy defined by the user. */ @@ -827,7 +874,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .asScala .filter { l => l.logType == null || l.logType == LogType.EventLogs } .toList - stale.foreach { log => + stale.filterNot(isProcessing).foreach { log => if (log.appId.isEmpty) { logInfo(s"Deleting invalid / corrupt event log ${log.logPath}") deleteLog(fs, new Path(log.logPath)) @@ -935,7 +982,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) .asScala .filter { l => l.logType != null && l.logType == LogType.DriverLogs } .toList - stale.foreach { log => + stale.filterNot(isProcessing).foreach { log => logInfo(s"Deleting invalid driver log ${log.logPath}") listing.delete(classOf[LogInfo], log.logPath) deleteLog(driverLogFs, new Path(log.logPath)) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index ed195dd44e91..eed42c546dcf 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -160,13 +160,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { assume(!Utils.isWindows) class TestFsHistoryProvider extends FsHistoryProvider(createTestConf()) { - var mergeApplicationListingCall = 0 - override protected def mergeApplicationListing( + var doMergeApplicationListingCall = 0 + override private[history] def doMergeApplicationListing( reader: EventLogFileReader, lastSeen: Long, enableSkipToEnd: Boolean): Unit = { - super.mergeApplicationListing(reader, lastSeen, enableSkipToEnd) - mergeApplicationListingCall += 1 + super.doMergeApplicationListing(reader, lastSeen, enableSkipToEnd) + doMergeApplicationListingCall += 1 } } val provider = new TestFsHistoryProvider @@ -187,7 +187,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { list.size should be (1) } - provider.mergeApplicationListingCall should be (1) + provider.doMergeApplicationListingCall should be (1) } test("history file is renamed from inprogress to completed") { @@ -1321,6 +1321,35 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { assertSerDe(serializer, attemptInfoWithIndex) } + test("SPARK-29043: clean up specified event log") { + val clock = new ManualClock() + val conf = createTestConf().set(MAX_LOG_AGE_S.key, "0").set(CLEANER_ENABLED.key, "true") + val provider = new FsHistoryProvider(conf, clock) + + // create an invalid application log file + val inValidLogFile = newLogFile("inValidLogFile", None, inProgress = true) + inValidLogFile.createNewFile() + writeFile(inValidLogFile, None, + SparkListenerApplicationStart(inValidLogFile.getName, None, 1L, "test", None)) + inValidLogFile.setLastModified(clock.getTimeMillis()) + + // create a valid application log file + val validLogFile = newLogFile("validLogFile", None, inProgress = true) + validLogFile.createNewFile() + writeFile(validLogFile, None, + SparkListenerApplicationStart(validLogFile.getName, Some("local_123"), 1L, "test", None)) + validLogFile.setLastModified(clock.getTimeMillis()) + + provider.checkForLogs() + // The invalid application log file would be cleaned by checkAndCleanLog(). + assert(new File(testDir.toURI).listFiles().size === 1) + + clock.advance(1) + // cleanLogs() would clean the valid application log file. + provider.cleanLogs() + assert(new File(testDir.toURI).listFiles().size === 0) + } + private def assertOptionAfterSerde(opt: Option[Long], expected: Option[Long]): Unit = { if (expected.isEmpty) { assert(opt.isEmpty)