@@ -34,7 +34,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
3434
3535import org .apache .spark .{Logging , SparkConf , SparkContext , SparkException , TestUtils }
3636import org .apache .spark .scheduler .cluster .ExecutorInfo
37- import org .apache .spark .scheduler .{SparkListenerApplicationStart , SparkListener ,
37+ import org .apache .spark .scheduler .{SparkListener , SparkListenerApplicationStart ,
3838 SparkListenerExecutorAdded }
3939import org .apache .spark .util .Utils
4040
@@ -293,6 +293,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
293293private [spark] class SaveExecutorInfo extends SparkListener {
294294 val addedExecutorInfos = mutable.Map [String , ExecutorInfo ]()
295295 var driverLogs : Option [collection.Map [String , String ]] = None
296+
296297 override def onExecutorAdded (executor : SparkListenerExecutorAdded ) {
297298 addedExecutorInfos(executor.executorId) = executor.executorInfo
298299 }
@@ -327,34 +328,35 @@ private object YarnClusterDriver extends Logging with Matchers {
327328 val data = sc.parallelize(1 to 4 , 4 ).collect().toSet
328329 assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS ))
329330 data should be (Set (1 , 2 , 3 , 4 ))
330-
331- // verify log urls are present
332- val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo ]
333- assert(listeners.size === 1 )
334- val listener = listeners(0 )
335- val executorInfos = listener.addedExecutorInfos.values
336- assert(executorInfos.nonEmpty)
337- executorInfos.foreach { info =>
338- assert(info.logUrlMap.nonEmpty)
339- }
340-
341- // YARN does some weird redirects after the app is done, so check before it is complete.
342- if (conf.get(" spark.master" ) == " yarn-cluster" ) {
343- val driverLogs = listener.driverLogs.get
344- assert(driverLogs.size === 2 )
345- assert(driverLogs.containsKey(" stderr" ))
346- assert(driverLogs.containsKey(" stdout" ))
347- val stderr = driverLogs(" stderr" ) // YARN puts everything in stderr.
348- val lines = Source .fromURL(stderr).getLines()
349- // Look for a line that contains YarnClusterSchedulerBackend, since that is guaranteed in
350- // cluster mode.
351- assert(lines.exists(_.contains(" YarnClusterSchedulerBackend" )))
352- }
353331 result = " success"
354332 } finally {
355333 sc.stop()
356334 Files .write(result, status, UTF_8 )
357335 }
336+
337+ // verify log urls are present
338+ val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo ]
339+ assert(listeners.size === 1 )
340+ val listener = listeners(0 )
341+ val executorInfos = listener.addedExecutorInfos.values
342+ assert(executorInfos.nonEmpty)
343+ executorInfos.foreach { info =>
344+ assert(info.logUrlMap.nonEmpty)
345+ }
346+
347+ // If we are running in yarn-cluster mode, verify that driver logs are downloadable.
348+ if (conf.get(" spark.master" ) == " yarn-cluster" ) {
349+ assert(listener.driverLogs.nonEmpty)
350+ val driverLogs = listener.driverLogs.get
351+ assert(driverLogs.size === 2 )
352+ assert(driverLogs.containsKey(" stderr" ))
353+ assert(driverLogs.containsKey(" stdout" ))
354+ val stderr = driverLogs(" stderr" ) // YARN puts everything in stderr.
355+ val lines = Source .fromURL(stderr).getLines()
356+ // Look for a line that contains YarnClusterSchedulerBackend, since that is guaranteed in
357+ // cluster mode.
358+ assert(lines.exists(_.contains(" YarnClusterSchedulerBackend" )))
359+ }
358360 }
359361
360362}
0 commit comments