@@ -117,6 +117,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
117117 // Log file name, Spark version, Compression codec, in progress
118118 (" app1-1234-1.0.inprogress" , " 1.0" , None , true ),
119119 (" app2-1234-0.9.1" , " 0.9.1" , None , false ),
120+ (" app-with-dashes-in-name-1234-1.0.1.inprogress" , " 1.0.1" , None , true ),
120121 (" app3-1234-0.9-org.apache.spark.io.LZFCompressionCodec" , " 0.9" ,
121122 Some (classOf [LZFCompressionCodec ]), false ),
122123 (" app-123456-1234-0.8-org.apache.spark.io.SnappyCompressionCodec.inprogress" , " 0.8" ,
@@ -254,20 +255,21 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
254255
255256 // Ensure all asserts have actually been triggered
256257 eventExistenceListener.assertAllCallbacksInvoked()
257- }
258258
259- /**
260- * Assert that all of the specified events are logged by the given EventLoggingListener.
261- *
262- * This is done while the application is still running, so the log file contains the
263- * IN_PROGRESS suffix.
264- */
265- private def assertEventsExist (eventLogger : EventLoggingListener , events : Seq [String ]) {
266- val eventLoggingInfo = EventLoggingListener .parseLoggingInfo(
267- new Path (eventLogger.logPath + EventLoggingListener .IN_PROGRESS ))
268- assert(eventLoggingInfo != null )
259+ // Make sure expected events exist in the log file.
260+ val eventLoggingInfo = EventLoggingListener .parseLoggingInfo(new Path (eventLogger.logPath))
269261 val lines = readFileLines(eventLoggingInfo.path, eventLoggingInfo.compressionCodec)
270- val eventSet = mutable.Set (events : _* )
262+ val eventSet = mutable.Set (
263+ Utils .getFormattedClassName(SparkListenerApplicationStart ),
264+ Utils .getFormattedClassName(SparkListenerBlockManagerAdded ),
265+ Utils .getFormattedClassName(SparkListenerEnvironmentUpdate ),
266+ Utils .getFormattedClassName(SparkListenerJobStart ),
267+ Utils .getFormattedClassName(SparkListenerJobEnd ),
268+ Utils .getFormattedClassName(SparkListenerStageSubmitted ),
269+ Utils .getFormattedClassName(SparkListenerStageCompleted ),
270+ Utils .getFormattedClassName(SparkListenerTaskStart ),
271+ Utils .getFormattedClassName(SparkListenerTaskEnd ),
272+ Utils .getFormattedClassName(SparkListenerApplicationEnd ))
271273 lines.foreach { line =>
272274 eventSet.foreach { event =>
273275 if (line.contains(event)) {
@@ -307,30 +309,14 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
307309 var appEnded = false
308310
309311 override def onJobStart (jobStart : SparkListenerJobStart ) {
310- assertEventsExist(eventLogger, Seq [String ](
311- Utils .getFormattedClassName(SparkListenerApplicationStart ),
312- Utils .getFormattedClassName(SparkListenerBlockManagerAdded ),
313- Utils .getFormattedClassName(SparkListenerEnvironmentUpdate )
314- ))
315312 jobStarted = true
316313 }
317314
318315 override def onJobEnd (jobEnd : SparkListenerJobEnd ) {
319- assertEventsExist(eventLogger, Seq [String ](
320- Utils .getFormattedClassName(SparkListenerJobStart ),
321- Utils .getFormattedClassName(SparkListenerJobEnd ),
322- Utils .getFormattedClassName(SparkListenerStageSubmitted ),
323- Utils .getFormattedClassName(SparkListenerStageCompleted ),
324- Utils .getFormattedClassName(SparkListenerTaskStart ),
325- Utils .getFormattedClassName(SparkListenerTaskEnd )
326- ))
327316 jobEnded = true
328317 }
329318
330319 override def onApplicationEnd (applicationEnd : SparkListenerApplicationEnd ) {
331- assertEventsExist(eventLogger, Seq [String ](
332- Utils .getFormattedClassName(SparkListenerApplicationEnd )
333- ))
334320 appEnded = true
335321 }
336322
0 commit comments