diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 15b581cdcc1b..5cf9889d2f43 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -374,11 +374,11 @@ private[spark] class ApplicationMaster( */ final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = { synchronized { - if (registered && !unregistered) { + if (!unregistered) { logInfo(s"Unregistering ApplicationMaster with $status" + Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) unregistered = true - client.unregister(status, Option(diagnostics).getOrElse("")) + client.unregister(status, yarnConf, Option(diagnostics).getOrElse("")) } } } @@ -386,6 +386,7 @@ private[spark] class ApplicationMaster( final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = { synchronized { if (!finished) { + var errorMsg = msg val inShutdown = ShutdownHookManager.inShutdown() if (registered || !isClusterMode) { exitCode = code @@ -393,10 +394,13 @@ private[spark] class ApplicationMaster( } else { finalStatus = FinalApplicationStatus.FAILED exitCode = ApplicationMaster.EXIT_SC_NOT_INITED + if (msg == null) { + errorMsg = "Failed to initialize Spark context" + } } logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" + - Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) - finalMsg = ComStrUtils.abbreviate(msg, sparkConf.get(AM_FINAL_MSG_LIMIT).toInt) + Option(errorMsg).map(msg => s", (reason: $errorMsg)").getOrElse("")) + finalMsg = ComStrUtils.abbreviate(errorMsg, sparkConf.get(AM_FINAL_MSG_LIMIT).toInt) finished = true if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { logDebug("shutting down reporter thread") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 842611807db4..f7ebf43c3dd8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -93,10 +93,16 @@ private[spark] class YarnRMClient extends Logging { * @param status The final status of the AM. * @param diagnostics Diagnostics message to include in the final status. */ - def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit = synchronized { - if (registered) { - amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) + def unregister(status: FinalApplicationStatus, + conf: YarnConfiguration, + diagnostics: String = ""): Unit = synchronized { + if (!registered) { + amClient = AMRMClient.createAMRMClient() + amClient.init(conf) + amClient.start() + registered = true } + amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) if (amClient != null) { amClient.stop() }