Skip to content

Commit 5bad2af

Browse files
author
Marcelo Vanzin
committed
More feedback.
1 parent 388858b commit 5bad2af

File tree

2 files changed

+42
-14
lines changed

2 files changed

+42
-14
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
452452
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime)
453453
})
454454
} catch {
455-
// let the iteration over logInfos break, since an exception on
455+
// let the iteration over the updated entries break, since an exception on
456456
// replayExecutor.submit (..) indicates the ExecutorService is unable
457457
// to take any more submissions at this time
458458
case e: Exception =>
@@ -636,6 +636,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
636636
(Some(app.info.id), app.attempts.head.info.attemptId)
637637

638638
case _ =>
639+
// If the app hasn't written down its app ID to the logs, still record the entry in the
640+
// listing db, with an empty ID. This will make the log eligible for deletion if the app
641+
// does not make progress after the configured max log age.
639642
(None, None)
640643
}
641644
listing.write(LogInfo(logPath.toString(), scanTime, appId, attemptId, fileStatus.getLen()))
@@ -904,6 +907,11 @@ private[history] case class FsHistoryProviderMetadata(
904907
uiVersion: Long,
905908
logDir: String)
906909

910+
/**
911+
* Tracking info for event logs detected in the configured log directory. Tracks both valid and
912+
* invalid logs (e.g. unparseable logs, recorded as logs with no app ID) so that the cleaner
913+
* can know what log files are safe to delete.
914+
*/
907915
private[history] case class LogInfo(
908916
@KVIndexParam logPath: String,
909917
@KVIndexParam("lastProcessed") lastProcessed: Long,

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -716,42 +716,62 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
716716
}
717717

718718
test("SPARK-21571: clean up removes invalid history files") {
719+
// TODO: "maxTime" becoming negative in cleanLogs() causes this test to fail, so avoid that
720+
// until we figure out what's causing the problem.
719721
val clock = new ManualClock(TimeUnit.DAYS.toMillis(120))
720-
val conf = createTestConf().set("spark.history.fs.cleaner.maxAge", s"2d")
722+
val conf = createTestConf().set(MAX_LOG_AGE_S.key, s"2d")
721723
val provider = new FsHistoryProvider(conf, clock) {
722724
override def getNewLastScanTime(): Long = clock.getTimeMillis()
723725
}
724726

725727
// Create 0-byte size inprogress and complete files
726-
val logfile1 = newLogFile("emptyInprogressLogFile", None, inProgress = true)
727-
logfile1.createNewFile()
728-
logfile1.setLastModified(clock.getTimeMillis())
728+
var logCount = 0
729+
var validLogCount = 0
729730

730-
val logfile2 = newLogFile("emptyFinishedLogFile", None, inProgress = false)
731-
logfile2.createNewFile()
732-
logfile2.setLastModified(clock.getTimeMillis())
731+
val emptyInProgress = newLogFile("emptyInprogressLogFile", None, inProgress = true)
732+
emptyInProgress.createNewFile()
733+
emptyInProgress.setLastModified(clock.getTimeMillis())
734+
logCount += 1
735+
736+
val slowApp = newLogFile("slowApp", None, inProgress = true)
737+
slowApp.createNewFile()
738+
slowApp.setLastModified(clock.getTimeMillis())
739+
logCount += 1
740+
741+
val emptyFinished = newLogFile("emptyFinishedLogFile", None, inProgress = false)
742+
emptyFinished.createNewFile()
743+
emptyFinished.setLastModified(clock.getTimeMillis())
744+
logCount += 1
733745

734746
// Create an incomplete log file, has an end record but no start record.
735-
val logfile3 = newLogFile("nonEmptyCorruptLogFile", None, inProgress = false)
736-
writeFile(logfile3, true, None, SparkListenerApplicationEnd(0))
737-
logfile3.setLastModified(clock.getTimeMillis())
747+
val corrupt = newLogFile("nonEmptyCorruptLogFile", None, inProgress = false)
748+
writeFile(corrupt, true, None, SparkListenerApplicationEnd(0))
749+
corrupt.setLastModified(clock.getTimeMillis())
750+
logCount += 1
738751

739752
provider.checkForLogs()
740753
provider.cleanLogs()
741-
assert(new File(testDir.toURI).listFiles().size === 3)
754+
assert(new File(testDir.toURI).listFiles().size === logCount)
742755

743756
// Move the clock forward 1 day and scan the files again. They should still be there.
744757
clock.advance(TimeUnit.DAYS.toMillis(1))
745758
provider.checkForLogs()
746759
provider.cleanLogs()
747-
assert(new File(testDir.toURI).listFiles().size === 3)
760+
assert(new File(testDir.toURI).listFiles().size === logCount)
761+
762+
// Update the slow app to contain valid info. Code should detect the change and not clean
763+
// it up.
764+
writeFile(slowApp, true, None,
765+
SparkListenerApplicationStart(slowApp.getName(), Some(slowApp.getName()), 1L, "test", None))
766+
slowApp.setLastModified(clock.getTimeMillis())
767+
validLogCount += 1
748768

749769
// Move the clock forward another 2 days and scan the files again. This time the cleaner should
750770
// pick up the invalid files and get rid of them.
751771
clock.advance(TimeUnit.DAYS.toMillis(2))
752772
provider.checkForLogs()
753773
provider.cleanLogs()
754-
assert(new File(testDir.toURI).listFiles().size === 0)
774+
assert(new File(testDir.toURI).listFiles().size === validLogCount)
755775
}
756776

757777
/**

0 commit comments

Comments
 (0)