diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index aaf068e81db0a..30261dde678f1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -37,7 +37,7 @@ import org.mockito.Mockito.{doThrow, mock, spy, verify, when} import org.scalatest.Matchers import org.scalatest.concurrent.Eventually._ -import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR import org.apache.spark.internal.config.History._ @@ -75,7 +75,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { inProgress: Boolean, codec: Option[String] = None): File = { val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else "" - val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId) + val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId, codec) val logPath = new Path(logUri).toUri.getPath + ip new File(logPath) } @@ -88,11 +88,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { private def testAppLogParsing(inMemory: Boolean) { val clock = new ManualClock(12345678) - val provider = new FsHistoryProvider(createTestConf(inMemory = inMemory), clock) + val conf = createTestConf(inMemory = inMemory) + val provider = new FsHistoryProvider(conf, clock) // Write a new-style application log. val newAppComplete = newLogFile("new1", None, inProgress = false) - writeFile(newAppComplete, true, None, + writeFile(newAppComplete, None, SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test", None), SparkListenerApplicationEnd(5L) @@ -101,14 +102,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { // Write a new-style application log. val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false, Some("lzf")) - writeFile(newAppCompressedComplete, true, None, + writeFile(newAppCompressedComplete, Some(CompressionCodec.createCodec(conf, "lzf")), SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"), 1L, "test", None), SparkListenerApplicationEnd(4L)) // Write an unfinished app, new-style. val newAppIncomplete = newLogFile("new2", None, inProgress = true) - writeFile(newAppIncomplete, true, None, + writeFile(newAppIncomplete, None, SparkListenerApplicationStart(newAppIncomplete.getName(), Some("new-incomplete"), 1L, "test", None) ) @@ -131,7 +132,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val duration = if (end > 0) end - start else 0 new ApplicationInfo(id, name, None, None, None, None, List(ApplicationAttemptInfo(None, new Date(start), - new Date(end), new Date(lastMod), duration, user, completed, ""))) + new Date(end), new Date(lastMod), duration, user, completed, SPARK_VERSION))) } // For completed files, lastUpdated would be lastModified time. @@ -170,12 +171,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val provider = new TestFsHistoryProvider val logFile1 = newLogFile("new1", None, inProgress = false) - writeFile(logFile1, true, None, + writeFile(logFile1, None, SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None), SparkListenerApplicationEnd(2L) ) val logFile2 = newLogFile("new2", None, inProgress = false) - writeFile(logFile2, true, None, + writeFile(logFile2, None, SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None), SparkListenerApplicationEnd(2L) ) @@ -192,7 +193,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val provider = new FsHistoryProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) - writeFile(logFile1, true, None, + writeFile(logFile1, None, SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None), SparkListenerApplicationEnd(2L) ) @@ -212,7 +213,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val provider = new FsHistoryProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) - writeFile(logFile1, true, None, + writeFile(logFile1, None, SparkListenerLogStart("1.4") ) updateAndCheck(provider) { list => @@ -224,7 +225,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val provider = new FsHistoryProvider(createTestConf()) val logFile1 = newLogFile("app1", None, inProgress = true) - writeFile(logFile1, true, None, + writeFile(logFile1, None, SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None), SparkListenerApplicationEnd(2L)) @@ -240,7 +241,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val provider = new FsHistoryProvider(createTestConf()) val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true) - writeFile(attempt1, true, None, + writeFile(attempt1, None, SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")) ) @@ -250,7 +251,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true) - writeFile(attempt2, true, None, + writeFile(attempt2, None, SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2")) ) @@ -261,7 +262,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false) - writeFile(attempt3, true, None, + writeFile(attempt3, None, SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")), SparkListenerApplicationEnd(4L) ) @@ -273,7 +274,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false) - writeFile(app2Attempt1, true, None, + writeFile(app2Attempt1, None, SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")), SparkListenerApplicationEnd(6L) ) @@ -448,7 +449,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { "test", Some("attempt1"))) ++ executorAddedEvents ++ (if (isCompletedApp) List(SparkListenerApplicationEnd(1000L)) else Seq()) - writeFile(attempt1, true, None, allEvents: _*) + writeFile(attempt1, None, allEvents: _*) updateAndCheck(provider) { list => list.size should be (1) @@ -487,14 +488,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock) val log1 = newLogFile("app1", Some("attempt1"), inProgress = false) - writeFile(log1, true, None, + writeFile(log1, None, SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")), SparkListenerApplicationEnd(2L) ) log1.setLastModified(0L) val log2 = newLogFile("app1", Some("attempt2"), inProgress = false) - writeFile(log2, true, None, + writeFile(log2, None, SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")), SparkListenerApplicationEnd(4L) ) @@ -532,14 +533,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val provider = new FsHistoryProvider( createTestConf().set(MAX_LOG_AGE_S, maxAge / 1000), clock) val log = newLogFile("inProgressApp1", None, inProgress = true) - writeFile(log, true, None, + writeFile(log, None, SparkListenerApplicationStart( "inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")) ) clock.setTime(firstFileModifiedTime) log.setLastModified(clock.getTimeMillis()) provider.checkForLogs() - writeFile(log, true, None, + writeFile(log, None, SparkListenerApplicationStart( "inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")), SparkListenerJobStart(0, 1L, Nil, null) @@ -549,7 +550,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { log.setLastModified(clock.getTimeMillis()) provider.checkForLogs() clock.setTime(TimeUnit.DAYS.toMillis(10)) - writeFile(log, true, None, + writeFile(log, None, SparkListenerApplicationStart( "inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")), SparkListenerJobStart(0, 1L, Nil, null), @@ -572,7 +573,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock) val log1 = newLogFile("inProgressApp1", None, inProgress = true) - writeFile(log1, true, None, + writeFile(log1, None, SparkListenerApplicationStart( "inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")) ) @@ -581,7 +582,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { provider.checkForLogs() val log2 = newLogFile("inProgressApp2", None, inProgress = true) - writeFile(log2, true, None, + writeFile(log2, None, SparkListenerApplicationStart( "inProgressApp2", Some("inProgressApp2"), 23L, "test2", Some("attempt2")) ) @@ -615,7 +616,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val provider = new FsHistoryProvider(createTestConf()) val logs = (1 to 2).map { i => val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false) - writeFile(log, true, None, + writeFile(log, None, SparkListenerApplicationStart( "downloadApp1", Some("downloadApp1"), 5000L * i, "test", Some(s"attempt$i")), SparkListenerApplicationEnd(5001L * i) @@ -707,7 +708,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { // Write a new log file without an app id, to make sure it's ignored. val logFile1 = newLogFile("app1", None, inProgress = true) - writeFile(logFile1, true, None, + writeFile(logFile1, None, SparkListenerLogStart("1.4") ) @@ -783,7 +784,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { // and write one real file, which should still get picked up just fine val newAppComplete = newLogFile("real-app", None, inProgress = false) - writeFile(newAppComplete, true, None, + writeFile(newAppComplete, None, SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test", None), SparkListenerApplicationEnd(5L) @@ -808,7 +809,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { try { provider = new FsHistoryProvider(conf) val log = newLogFile("app1", Some("attempt1"), inProgress = false) - writeFile(log, true, None, + writeFile(log, None, SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(), "test", Some("attempt1")), SparkListenerEnvironmentUpdate(Map( @@ -898,7 +899,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val oldProvider = new FsHistoryProvider(conf) val logFile1 = newLogFile("app1", None, inProgress = false) - writeFile(logFile1, true, None, + writeFile(logFile1, None, SparkListenerLogStart("2.3"), SparkListenerApplicationStart("test", Some("test"), 1L, "test", None), SparkListenerApplicationEnd(5L) @@ -926,7 +927,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { // Write an incomplete app log. val appLog = newLogFile(appId, None, inProgress = true) - writeFile(appLog, true, None, + writeFile(appLog, None, SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None) ) provider.checkForLogs() @@ -939,7 +940,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } // Add more info to the app log, and trigger the provider to update things. - writeFile(appLog, true, None, + writeFile(appLog, None, SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None), SparkListenerJobStart(0, 1L, Nil, null) ) @@ -966,13 +967,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { // Write logs for two app attempts. clock.advance(1) val attempt1 = newLogFile(appId, Some("1"), inProgress = false) - writeFile(attempt1, true, None, + writeFile(attempt1, None, SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("1")), SparkListenerJobStart(0, 1L, Nil, null), SparkListenerApplicationEnd(5L) ) val attempt2 = newLogFile(appId, Some("2"), inProgress = false) - writeFile(attempt2, true, None, + writeFile(attempt2, None, SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("2")), SparkListenerJobStart(0, 1L, Nil, null), SparkListenerApplicationEnd(5L) @@ -1034,7 +1035,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { // Create an incomplete log file, has an end record but no start record. val corrupt = newLogFile("nonEmptyCorruptLogFile", None, inProgress = false) - writeFile(corrupt, true, None, SparkListenerApplicationEnd(0)) + writeFile(corrupt, None, SparkListenerApplicationEnd(0)) corrupt.setLastModified(clock.getTimeMillis()) logCount += 1 @@ -1050,7 +1051,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { // Update the slow app to contain valid info. Code should detect the change and not clean // it up. - writeFile(slowApp, true, None, + writeFile(slowApp, None, SparkListenerApplicationStart(slowApp.getName(), Some(slowApp.getName()), 1L, "test", None)) slowApp.setLastModified(clock.getTimeMillis()) validLogCount += 1 @@ -1067,7 +1068,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { // Create a log file where the end event is before the configure chunk to be reparsed at // the end of the file. The correct listing should still be generated. val log = newLogFile("end-event-test", None, inProgress = false) - writeFile(log, true, None, + writeFile(log, None, Seq( SparkListenerApplicationStart("end-event-test", Some("end-event-test"), 1L, "test", None), SparkListenerEnvironmentUpdate(Map( @@ -1096,13 +1097,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val provider = new FsHistoryProvider(conf) val complete = newLogFile("complete", None, inProgress = false) - writeFile(complete, true, None, + writeFile(complete, None, SparkListenerApplicationStart("complete", Some("complete"), 1L, "test", None), SparkListenerApplicationEnd(5L) ) val incomplete = newLogFile("incomplete", None, inProgress = true) - writeFile(incomplete, true, None, + writeFile(incomplete, None, SparkListenerApplicationStart("incomplete", Some("incomplete"), 1L, "test", None) ) @@ -1116,10 +1117,10 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val clock = new ManualClock(1533132471) val provider = new FsHistoryProvider(createTestConf(), clock) val accessDenied = newLogFile("accessDenied", None, inProgress = false) - writeFile(accessDenied, true, None, + writeFile(accessDenied, None, SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None)) val accessGranted = newLogFile("accessGranted", None, inProgress = false) - writeFile(accessGranted, true, None, + writeFile(accessGranted, None, SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None), SparkListenerApplicationEnd(5L)) var isReadable = false @@ -1189,34 +1190,34 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { val clock = new ManualClock(0) (5 to 0 by -1).foreach { num => val log1_1 = newLogFile("app1", Some("attempt1"), inProgress = false) - writeFile(log1_1, true, None, + writeFile(log1_1, None, SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")), SparkListenerApplicationEnd(2L) ) log1_1.setLastModified(2L) val log2_1 = newLogFile("app2", Some("attempt1"), inProgress = false) - writeFile(log2_1, true, None, + writeFile(log2_1, None, SparkListenerApplicationStart("app2", Some("app2"), 3L, "test", Some("attempt1")), SparkListenerApplicationEnd(4L) ) log2_1.setLastModified(4L) val log3_1 = newLogFile("app3", Some("attempt1"), inProgress = false) - writeFile(log3_1, true, None, + writeFile(log3_1, None, SparkListenerApplicationStart("app3", Some("app3"), 5L, "test", Some("attempt1")), SparkListenerApplicationEnd(6L) ) log3_1.setLastModified(6L) val log1_2_incomplete = newLogFile("app1", Some("attempt2"), inProgress = false) - writeFile(log1_2_incomplete, true, None, + writeFile(log1_2_incomplete, None, SparkListenerApplicationStart("app1", Some("app1"), 7L, "test", Some("attempt2")) ) log1_2_incomplete.setLastModified(8L) val log3_2 = newLogFile("app3", Some("attempt2"), inProgress = false) - writeFile(log3_2, true, None, + writeFile(log3_2, None, SparkListenerApplicationStart("app3", Some("app3"), 9L, "test", Some("attempt2")), SparkListenerApplicationEnd(10L) ) @@ -1250,19 +1251,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { checkFn(provider.getListing().toSeq) } - private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec], + private def writeFile(file: File, codec: Option[CompressionCodec], events: SparkListenerEvent*) = { val fstream = new FileOutputStream(file) val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream) val bstream = new BufferedOutputStream(cstream) - if (isNewFormat) { - val newFormatStream = new FileOutputStream(file) - Utils.tryWithSafeFinally { - EventLoggingListener.initEventLog(newFormatStream, false, null) - } { - newFormatStream.close() - } - } + EventLoggingListener.initEventLog(bstream, false, null) val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8) Utils.tryWithSafeFinally {