diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 30261dde678f1..0998e385a4b1d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -90,6 +90,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val clock = new ManualClock(12345678) val conf = createTestConf(inMemory = inMemory) val provider = new FsHistoryProvider(conf, clock) + provider.start() // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) @@ -169,6 +170,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } val provider = new TestFsHistoryProvider + provider.start() val logFile1 = newLogFile("new1", None, inProgress = false) writeFile(logFile1, None, @@ -191,6 +193,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("history file is renamed from inprogress to completed") { val provider = new FsHistoryProvider(createTestConf()) + provider.start() val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, None, @@ -211,6 +214,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("Parse logs that application is not started") { val provider = new FsHistoryProvider(createTestConf()) + provider.start() val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, None, @@ -223,6 +227,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("SPARK-5582: empty log directory") { val provider = new FsHistoryProvider(createTestConf()) + provider.start() val logFile1 = newLogFile("app1", None, inProgress = true) writeFile(logFile1, None, @@ -239,6 +244,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("apps with multiple attempts with order") { val provider = new FsHistoryProvider(createTestConf()) + provider.start() val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true) writeFile(attempt1, None, @@ -439,6 +445,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { expectedLogUrlMap: Map[ExecutorInfo, Map[String, String]], isCompletedApp: Boolean = true): Unit = { val provider = new FsHistoryProvider(conf) + provider.start() val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true) @@ -486,6 +493,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val clock = new ManualClock(maxAge / 2) val provider = new FsHistoryProvider( createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock) + provider.start() val log1 = newLogFile("app1", Some("attempt1"), inProgress = false) writeFile(log1, None, @@ -532,6 +540,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val clock = new ManualClock(0) val provider = new FsHistoryProvider( createTestConf().set(MAX_LOG_AGE_S, maxAge / 1000), clock) + provider.start() val log = newLogFile("inProgressApp1", None, inProgress = true) writeFile(log, None, SparkListenerApplicationStart( @@ -571,6 +580,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val clock = new ManualClock(0) val provider = new FsHistoryProvider( createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock) + provider.start() val log1 = newLogFile("inProgressApp1", None, inProgress = true) writeFile(log1, None, @@ -614,6 +624,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("Event log copy") { val provider = new FsHistoryProvider(createTestConf()) + provider.start() val logs = (1 to 2).map { i => val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false) writeFile(log, None, @@ -659,6 +670,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4) testConf.set(MAX_DRIVER_LOG_AGE_S, maxAge) val provider = new FsHistoryProvider(testConf, clock) + provider.start() val log1 = FileUtils.getFile(testDir, "1" + DriverLogger.DRIVER_LOG_FILE_SUFFIX) createEmptyFile(log1) @@ -705,6 +717,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("SPARK-8372: new logs with no app ID are ignored") { val provider = new FsHistoryProvider(createTestConf()) + provider.start() // Write a new log file without an app id, to make sure it's ignored. val logFile1 = newLogFile("app1", None, inProgress = true) @@ -719,6 +732,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("provider correctly checks whether fs is in safe mode") { val provider = spy(new FsHistoryProvider(createTestConf())) + provider.start() val dfs = mock(classOf[DistributedFileSystem]) // Asserts that safe mode is false because we can't really control the return value of the mock, // since the API is different between hadoop 1 and 2. @@ -728,6 +742,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("provider waits for safe mode to finish before initializing") { val clock = new ManualClock() val provider = new SafeModeTestProvider(createTestConf(), clock) + provider.start() val initThread = provider.initialize() try { provider.getConfig().keys should contain ("HDFS State") @@ -750,6 +765,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { testDir.delete() val clock = new ManualClock() val provider = new SafeModeTestProvider(createTestConf(), clock) + provider.start() val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler]) provider.startSafeModeCheckThread(Some(errorHandler)) try { @@ -791,6 +807,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { ) val provider = new FsHistoryProvider(createTestConf()) + provider.start() updateAndCheck(provider) { list => list.size should be (1) list(0).name should be ("real-app") @@ -808,6 +825,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { var provider: FsHistoryProvider = null try { provider = new FsHistoryProvider(conf) + provider.start() val log = newLogFile("app1", Some("attempt1"), inProgress = false) writeFile(log, None, SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(), @@ -897,6 +915,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("mismatched version discards old listing") { val conf = createTestConf() val oldProvider = new FsHistoryProvider(conf) + oldProvider.start() val logFile1 = newLogFile("app1", None, inProgress = false) writeFile(logFile1, None, @@ -918,11 +937,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { oldProvider.stop() val mistatchedVersionProvider = new FsHistoryProvider(conf) + mistatchedVersionProvider.start() assert(mistatchedVersionProvider.listing.count(classOf[ApplicationInfoWrapper]) === 0) } test("invalidate cached UI") { val provider = new FsHistoryProvider(createTestConf()) + provider.start() val appId = "new1" // Write an incomplete app log. @@ -962,6 +983,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) val clock = new ManualClock() val provider = spy(new FsHistoryProvider(conf, clock)) + provider.start() val appId = "new1" // Write logs for two app attempts. @@ -1013,6 +1035,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val clock = new ManualClock() val conf = createTestConf().set(MAX_LOG_AGE_S.key, s"2d") val provider = new FsHistoryProvider(conf, clock) + provider.start() // Create 0-byte size inprogress and complete files var logCount = 0 @@ -1083,6 +1106,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val conf = createTestConf().set(END_EVENT_REPARSE_CHUNK_SIZE.key, s"1k") val provider = new FsHistoryProvider(conf) + provider.start() updateAndCheck(provider) { list => assert(list.size === 1) assert(list(0).attempts.size === 1) @@ -1095,6 +1119,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { .set(END_EVENT_REPARSE_CHUNK_SIZE, 0L) .set(FAST_IN_PROGRESS_PARSING, false) val provider = new FsHistoryProvider(conf) + provider.start() val complete = newLogFile("complete", None, inProgress = false) writeFile(complete, None, @@ -1116,6 +1141,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("SPARK-24948: blacklist files we don't have read permission on") { val clock = new ManualClock(1533132471) val provider = new FsHistoryProvider(createTestConf(), clock) + provider.start() val accessDenied = newLogFile("accessDenied", None, inProgress = false) writeFile(accessDenied, None, SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None)) @@ -1153,6 +1179,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("check in-progress event logs absolute length") { val path = new Path("testapp.inprogress") val provider = new FsHistoryProvider(createTestConf()) + provider.start() val mockedProvider = spy(provider) val mockedFs = mock(classOf[FileSystem]) val in = mock(classOf[FSDataInputStream]) @@ -1224,6 +1251,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { log3_2.setLastModified(10L) val provider = new FsHistoryProvider(createTestConf().set(MAX_LOG_NUM.key, s"$num"), clock) + provider.start() updateAndCheck(provider) { list => assert(log1_1.exists() == (num > 4)) assert(log1_2_incomplete.exists()) // Always exists for all configurations diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 71a127bd4b9f6..417f81438357d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -88,6 +88,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set(EVENT_LOG_PROCESS_TREE_METRICS, true) conf.setAll(extraConf) provider = new FsHistoryProvider(conf) + provider.start() provider.checkForLogs() val securityManager = HistoryServer.createSecurityManager(conf) @@ -426,6 +427,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) .remove(IS_TESTING) val provider = new FsHistoryProvider(myConf) + provider.start() val securityManager = HistoryServer.createSecurityManager(myConf) sc = new SparkContext("local", "test", myConf)