Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val clock = new ManualClock(12345678)
val conf = createTestConf(inMemory = inMemory)
val provider = new FsHistoryProvider(conf, clock)
provider.start()

// Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false)
Expand Down Expand Up @@ -169,6 +170,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
}
val provider = new TestFsHistoryProvider
provider.start()

val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, None,
Expand All @@ -191,6 +193,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {

test("history file is renamed from inprogress to completed") {
val provider = new FsHistoryProvider(createTestConf())
provider.start()

val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, None,
Expand All @@ -211,6 +214,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {

test("Parse logs that application is not started") {
val provider = new FsHistoryProvider(createTestConf())
provider.start()

val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, None,
Expand All @@ -223,6 +227,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {

test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())
provider.start()

val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, None,
Expand All @@ -239,6 +244,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {

test("apps with multiple attempts with order") {
val provider = new FsHistoryProvider(createTestConf())
provider.start()

val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
writeFile(attempt1, None,
Expand Down Expand Up @@ -439,6 +445,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
expectedLogUrlMap: Map[ExecutorInfo, Map[String, String]],
isCompletedApp: Boolean = true): Unit = {
val provider = new FsHistoryProvider(conf)
provider.start()

val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)

Expand Down Expand Up @@ -486,6 +493,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val clock = new ManualClock(maxAge / 2)
val provider = new FsHistoryProvider(
createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)
provider.start()

val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log1, None,
Expand Down Expand Up @@ -532,6 +540,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val clock = new ManualClock(0)
val provider = new FsHistoryProvider(
createTestConf().set(MAX_LOG_AGE_S, maxAge / 1000), clock)
provider.start()
val log = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log, None,
SparkListenerApplicationStart(
Expand Down Expand Up @@ -571,6 +580,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val clock = new ManualClock(0)
val provider = new FsHistoryProvider(
createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)
provider.start()

val log1 = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log1, None,
Expand Down Expand Up @@ -614,6 +624,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {

test("Event log copy") {
val provider = new FsHistoryProvider(createTestConf())
provider.start()
val logs = (1 to 2).map { i =>
val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false)
writeFile(log, None,
Expand Down Expand Up @@ -659,6 +670,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4)
testConf.set(MAX_DRIVER_LOG_AGE_S, maxAge)
val provider = new FsHistoryProvider(testConf, clock)
provider.start()

val log1 = FileUtils.getFile(testDir, "1" + DriverLogger.DRIVER_LOG_FILE_SUFFIX)
createEmptyFile(log1)
Expand Down Expand Up @@ -705,6 +717,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {

test("SPARK-8372: new logs with no app ID are ignored") {
val provider = new FsHistoryProvider(createTestConf())
provider.start()

// Write a new log file without an app id, to make sure it's ignored.
val logFile1 = newLogFile("app1", None, inProgress = true)
Expand All @@ -719,6 +732,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {

test("provider correctly checks whether fs is in safe mode") {
val provider = spy(new FsHistoryProvider(createTestConf()))
provider.start()
val dfs = mock(classOf[DistributedFileSystem])
// Asserts that safe mode is false because we can't really control the return value of the mock,
// since the API is different between hadoop 1 and 2.
Expand All @@ -728,6 +742,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
test("provider waits for safe mode to finish before initializing") {
val clock = new ManualClock()
val provider = new SafeModeTestProvider(createTestConf(), clock)
provider.start()
val initThread = provider.initialize()
try {
provider.getConfig().keys should contain ("HDFS State")
Expand All @@ -750,6 +765,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
testDir.delete()
val clock = new ManualClock()
val provider = new SafeModeTestProvider(createTestConf(), clock)
provider.start()
val errorHandler = mock(classOf[Thread.UncaughtExceptionHandler])
provider.startSafeModeCheckThread(Some(errorHandler))
try {
Expand Down Expand Up @@ -791,6 +807,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
)

val provider = new FsHistoryProvider(createTestConf())
provider.start()
updateAndCheck(provider) { list =>
list.size should be (1)
list(0).name should be ("real-app")
Expand All @@ -808,6 +825,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
var provider: FsHistoryProvider = null
try {
provider = new FsHistoryProvider(conf)
provider.start()
val log = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log, None,
SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(),
Expand Down Expand Up @@ -897,6 +915,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
test("mismatched version discards old listing") {
val conf = createTestConf()
val oldProvider = new FsHistoryProvider(conf)
oldProvider.start()

val logFile1 = newLogFile("app1", None, inProgress = false)
writeFile(logFile1, None,
Expand All @@ -918,11 +937,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
oldProvider.stop()

val mistatchedVersionProvider = new FsHistoryProvider(conf)
mistatchedVersionProvider.start()
assert(mistatchedVersionProvider.listing.count(classOf[ApplicationInfoWrapper]) === 0)
}

test("invalidate cached UI") {
val provider = new FsHistoryProvider(createTestConf())
provider.start()
val appId = "new1"

// Write an incomplete app log.
Expand Down Expand Up @@ -962,6 +983,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val conf = createTestConf().set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
val clock = new ManualClock()
val provider = spy(new FsHistoryProvider(conf, clock))
provider.start()
val appId = "new1"

// Write logs for two app attempts.
Expand Down Expand Up @@ -1013,6 +1035,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val clock = new ManualClock()
val conf = createTestConf().set(MAX_LOG_AGE_S.key, s"2d")
val provider = new FsHistoryProvider(conf, clock)
provider.start()

// Create 0-byte size inprogress and complete files
var logCount = 0
Expand Down Expand Up @@ -1083,6 +1106,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {

val conf = createTestConf().set(END_EVENT_REPARSE_CHUNK_SIZE.key, s"1k")
val provider = new FsHistoryProvider(conf)
provider.start()
updateAndCheck(provider) { list =>
assert(list.size === 1)
assert(list(0).attempts.size === 1)
Expand All @@ -1095,6 +1119,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
.set(END_EVENT_REPARSE_CHUNK_SIZE, 0L)
.set(FAST_IN_PROGRESS_PARSING, false)
val provider = new FsHistoryProvider(conf)
provider.start()

val complete = newLogFile("complete", None, inProgress = false)
writeFile(complete, None,
Expand All @@ -1116,6 +1141,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
test("SPARK-24948: blacklist files we don't have read permission on") {
val clock = new ManualClock(1533132471)
val provider = new FsHistoryProvider(createTestConf(), clock)
provider.start()
val accessDenied = newLogFile("accessDenied", None, inProgress = false)
writeFile(accessDenied, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None))
Expand Down Expand Up @@ -1153,6 +1179,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
test("check in-progress event logs absolute length") {
val path = new Path("testapp.inprogress")
val provider = new FsHistoryProvider(createTestConf())
provider.start()
val mockedProvider = spy(provider)
val mockedFs = mock(classOf[FileSystem])
val in = mock(classOf[FSDataInputStream])
Expand Down Expand Up @@ -1224,6 +1251,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
log3_2.setLastModified(10L)

val provider = new FsHistoryProvider(createTestConf().set(MAX_LOG_NUM.key, s"$num"), clock)
provider.start()
updateAndCheck(provider) { list =>
assert(log1_1.exists() == (num > 4))
assert(log1_2_incomplete.exists()) // Always exists for all configurations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
.set(EVENT_LOG_PROCESS_TREE_METRICS, true)
conf.setAll(extraConf)
provider = new FsHistoryProvider(conf)
provider.start()
provider.checkForLogs()
val securityManager = HistoryServer.createSecurityManager(conf)

Expand Down Expand Up @@ -426,6 +427,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.remove(IS_TESTING)
val provider = new FsHistoryProvider(myConf)
provider.start()
val securityManager = HistoryServer.createSecurityManager(myConf)

sc = new SparkContext("local", "test", myConf)
Expand Down