@@ -69,7 +69,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
6969 private val sparkContextRef = new AtomicReference [SparkContext ](null )
7070 private val userResult = new AtomicBoolean (false )
7171
72- final def run () = {
72+ final def run (): Unit = {
7373 // Setup the directories so things go to YARN approved directories rather
7474 // than user specified and /tmp.
7575 System .setProperty(" spark.local.dir" , getLocalDirs())
@@ -83,8 +83,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
8383 System .setProperty(" spark.master" , " yarn-cluster" )
8484 }
8585
86- val attemptId = client.getAttemptId()
87- logInfo(" ApplicationAttemptId: " + attemptId)
86+ logInfo(" ApplicationAttemptId: " + client.getAttemptId())
8887
8988 // Call this to force generation of secret so it gets populated into the
9089 // Hadoop UGI. This has to happen before the startUserClass which does a
@@ -102,6 +101,16 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
102101 waitForSparkContextInitialized()
103102
104103 val sc = sparkContextRef.get()
104+
105+ // If there is no SparkContext at this point, just fail the app.
106+ if (sc == null ) {
107+ finish(FinalApplicationStatus .FAILED , " Timed out waiting for SparkContext." )
108+ if (isLastAttempt()) {
109+ cleanupStagingDir()
110+ }
111+ return
112+ }
113+
105114 (sc.ui.appUIHostPort, YarnSparkHadoopUtil .getUIHistoryAddress(sc, sparkConf))
106115 } else {
107116 actorSystem = AkkaUtils .createActorSystem(" sparkYarnAM" , Utils .localHostName, 0 ,
@@ -146,14 +155,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
146155
147156 finish(if (success) FinalApplicationStatus .SUCCEEDED else FinalApplicationStatus .FAILED )
148157
149- val shouldCleanup =
150- if (success) {
151- true
152- } else {
153- val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
154- attemptId.getAttemptId() >= maxAppAttempts
155- }
156-
158+ val shouldCleanup = success || isLastAttempt()
157159 if (shouldCleanup) {
158160 cleanupStagingDir()
159161 }
@@ -197,6 +199,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
197199 }
198200 }
199201
202+ private def isLastAttempt () = {
203+ val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
204+ client.getAttemptId().getAttemptId() >= maxAppAttempts
205+ }
206+
200207 /** Get the Yarn approved local directories. */
201208 private def getLocalDirs (): String = {
202209 // Hadoop 0.23 and 2.x have different Environment variable names for the
@@ -316,16 +323,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
316323 }
317324
318325 private def allocateExecutors () = {
326+ logInfo(" Requesting" + args.numExecutors + " executors." )
319327 try {
320- logInfo(" Requesting" + args.numExecutors + " executors." )
321- allocator.allocateResources()
322-
323- var iters = 0
324328 while (allocator.getNumExecutorsRunning < args.numExecutors && ! finished) {
325329 checkNumExecutorsFailed()
326330 allocator.allocateResources()
327331 Thread .sleep(ALLOCATE_HEARTBEAT_INTERVAL )
328- iters += 1
329332 }
330333 }
331334 logInfo(" All executors have launched." )
0 commit comments