Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ private[spark] class ApplicationMaster(
@volatile private var reporterThread: Thread = _
@volatile private var allocator: YarnAllocator = _

// A flag to check whether user has initialized spark context
@volatile private var registered = false

private val userClassLoader = {
val classpath = Client.getUserClasspath(sparkConf)
val urls = classpath.map { entry =>
Expand Down Expand Up @@ -319,7 +322,7 @@ private[spark] class ApplicationMaster(
*/
final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = {
synchronized {
if (!unregistered) {
if (registered && !unregistered) {
logInfo(s"Unregistering ApplicationMaster with $status" +
Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
unregistered = true
Expand All @@ -332,10 +335,15 @@ private[spark] class ApplicationMaster(
synchronized {
if (!finished) {
val inShutdown = ShutdownHookManager.inShutdown()
logInfo(s"Final app status: $status, exitCode: $code" +
if (registered) {
exitCode = code
finalStatus = status
} else {
finalStatus = FinalApplicationStatus.FAILED
exitCode = ApplicationMaster.EXIT_SC_NOT_INITED
}
logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" +
Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
exitCode = code
finalStatus = status
finalMsg = msg
finished = true
if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
Expand Down Expand Up @@ -439,12 +447,11 @@ private[spark] class ApplicationMaster(
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
registered = true
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
if (!finished) {
throw new IllegalStateException("SparkContext is null but app is still running!")
}
throw new IllegalStateException("User did not initialize spark context!")
}
userClassThread.join()
} catch {
Expand Down