Skip to content

Commit 70e824f

Browse files
committed
[SPARK-3627] - [yarn] - fix exit code and final status reporting to RM
See the description and whats handled in the jira comment: https://issues.apache.org/jira/browse/SPARK-3627?focusedCommentId=14150013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14150013 This does not handle yarn client mode reporting of the driver to the AM. I think that should be handled when we make it an unmanaged AM. Author: Thomas Graves <[email protected]> Closes #2577 from tgravescs/SPARK-3627 and squashes the following commits: 9c2efbf [Thomas Graves] review comments e8cc261 [Thomas Graves] fix accidental typo during fixing comment 24c98e3 [Thomas Graves] rework 85f1901 [Thomas Graves] Merge remote-tracking branch 'upstream/master' into SPARK-3627 fab166d [Thomas Graves] update based on review comments 32f4dfa [Thomas Graves] switch back f0b6519 [Thomas Graves] change order of cleanup staging dir d3cc800 [Thomas Graves] SPARK-3627 - yarn - fix exit code and final status reporting to RM
1 parent 69c3f44 commit 70e824f

File tree

4 files changed

+212
-126
lines changed

4 files changed

+212
-126
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
4040
private var rpc: YarnRPC = null
4141
private var resourceManager: AMRMProtocol = _
4242
private var uiHistoryAddress: String = _
43+
private var registered: Boolean = false
4344

4445
override def register(
4546
conf: YarnConfiguration,
@@ -51,8 +52,11 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
5152
this.rpc = YarnRPC.create(conf)
5253
this.uiHistoryAddress = uiHistoryAddress
5354

54-
resourceManager = registerWithResourceManager(conf)
55-
registerApplicationMaster(uiAddress)
55+
synchronized {
56+
resourceManager = registerWithResourceManager(conf)
57+
registerApplicationMaster(uiAddress)
58+
registered = true
59+
}
5660

5761
new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
5862
preferredNodeLocations, securityMgr)
@@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
6670
appAttemptId
6771
}
6872

69-
override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = {
70-
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
71-
.asInstanceOf[FinishApplicationMasterRequest]
72-
finishReq.setAppAttemptId(getAttemptId())
73-
finishReq.setFinishApplicationStatus(status)
74-
finishReq.setDiagnostics(diagnostics)
75-
finishReq.setTrackingUrl(uiHistoryAddress)
76-
resourceManager.finishApplicationMaster(finishReq)
73+
override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
74+
if (registered) {
75+
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
76+
.asInstanceOf[FinishApplicationMasterRequest]
77+
finishReq.setAppAttemptId(getAttemptId())
78+
finishReq.setFinishApplicationStatus(status)
79+
finishReq.setDiagnostics(diagnostics)
80+
finishReq.setTrackingUrl(uiHistoryAddress)
81+
resourceManager.finishApplicationMaster(finishReq)
82+
}
7783
}
7884

7985
override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {

0 commit comments

Comments
 (0)