diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index fc925022b271..ca6a3ef3ebbb 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -90,6 +90,9 @@ private[spark] class ApplicationMaster( @volatile private var reporterThread: Thread = _ @volatile private var allocator: YarnAllocator = _ + // A flag to check whether user has initialized spark context + @volatile private var registered = false + private val userClassLoader = { val classpath = Client.getUserClasspath(sparkConf) val urls = classpath.map { entry => @@ -319,7 +322,7 @@ private[spark] class ApplicationMaster( */ final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = { synchronized { - if (!unregistered) { + if (registered && !unregistered) { logInfo(s"Unregistering ApplicationMaster with $status" + Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) unregistered = true @@ -332,10 +335,15 @@ private[spark] class ApplicationMaster( synchronized { if (!finished) { val inShutdown = ShutdownHookManager.inShutdown() - logInfo(s"Final app status: $status, exitCode: $code" + + if (registered) { + exitCode = code + finalStatus = status + } else { + finalStatus = FinalApplicationStatus.FAILED + exitCode = ApplicationMaster.EXIT_SC_NOT_INITED + } + logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" + Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) - exitCode = code - finalStatus = status finalMsg = msg finished = true if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { @@ -439,12 +447,11 @@ private[spark] class ApplicationMaster( sc.getConf.get("spark.driver.port"), isClusterMode = true) registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr) + registered = true } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. - if (!finished) { - throw new IllegalStateException("SparkContext is null but app is still running!") - } + throw new IllegalStateException("User did not initialize spark context!") } userClassThread.join() } catch {