Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,29 +374,33 @@ private[spark] class ApplicationMaster(
*/
final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = {
synchronized {
if (registered && !unregistered) {
if (!unregistered) {
logInfo(s"Unregistering ApplicationMaster with $status" +
Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
unregistered = true
client.unregister(status, Option(diagnostics).getOrElse(""))
client.unregister(status, yarnConf, Option(diagnostics).getOrElse(""))
}
}
}

final def finish(status: FinalApplicationStatus, code: Int, msg: String = null): Unit = {
synchronized {
if (!finished) {
var errorMsg = msg
val inShutdown = ShutdownHookManager.inShutdown()
if (registered || !isClusterMode) {
exitCode = code
finalStatus = status
} else {
finalStatus = FinalApplicationStatus.FAILED
exitCode = ApplicationMaster.EXIT_SC_NOT_INITED
if (msg == null) {
errorMsg = "Failed to initialize Spark context"
}
}
logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" +
Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
finalMsg = ComStrUtils.abbreviate(msg, sparkConf.get(AM_FINAL_MSG_LIMIT).toInt)
Option(errorMsg).map(msg => s", (reason: $errorMsg)").getOrElse(""))
finalMsg = ComStrUtils.abbreviate(errorMsg, sparkConf.get(AM_FINAL_MSG_LIMIT).toInt)
finished = true
if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) {
logDebug("shutting down reporter thread")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,16 @@ private[spark] class YarnRMClient extends Logging {
* @param status The final status of the AM.
* @param diagnostics Diagnostics message to include in the final status.
*/
def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit = synchronized {
if (registered) {
amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
def unregister(status: FinalApplicationStatus,
conf: YarnConfiguration,
diagnostics: String = ""): Unit = synchronized {
if (!registered) {
amClient = AMRMClient.createAMRMClient()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very odd to register during unregister, what if it was the register or start that failed?
if it timed out and you have a very high timeout I assume you would wait that again.
I'm definitely a bit hesitant about this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should a timeout control be added?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the question for me is, is this really needed vs possibly introducing other issues. while it may be convenient, user can just go look in the log to find the error. You tested a bunch of scenarios which is great, but I'm sure there are others. The main one I mention here is what if registration failed or RM is not responsive? Can you test out that scenario and see what happens?

amClient.init(conf)
amClient.start()
registered = true
}
amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
if (amClient != null) {
amClient.stop()
}
Expand Down