Skip to content

Commit 34f1e63

Browse files
author
Marcelo Vanzin
committed
Fix some questionable error handling.
1 parent 5657c7d commit 34f1e63

File tree

1 file changed

+50
-54
lines changed

1 file changed

+50
-54
lines changed

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 50 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -90,71 +90,67 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
9090
// doAs in order for the credentials to be passed on to the executor containers.
9191
val securityMgr = new SecurityManager(sparkConf)
9292

93-
val (uiAddress, uiHistoryAddress) = if (isDriver) {
94-
addAmIpFilter()
93+
val (sc, uiAddress, uiHistoryAddress) =
94+
if (isDriver) {
95+
addAmIpFilter()
96+
userThread = startUserClass()
9597

96-
// Start the user's JAR
97-
userThread = startUserClass()
98+
// This a bit hacky, but we need to wait until the spark.driver.port property has
99+
// been set by the Thread executing the user class.
100+
waitForSparkContextInitialized()
98101

99-
// This a bit hacky, but we need to wait until the spark.driver.port property has
100-
// been set by the Thread executing the user class.
101-
waitForSparkContextInitialized()
102+
val sc = sparkContextRef.get()
102103

103-
val sc = sparkContextRef.get()
104+
// If there is no SparkContext at this point, just fail the app.
105+
if (sc == null) {
104106

105-
// If there is no SparkContext at this point, just fail the app.
106-
if (sc == null) {
107-
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
108-
if (isLastAttempt()) {
109-
cleanupStagingDir()
107+
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
108+
if (isLastAttempt()) {
109+
cleanupStagingDir()
110+
}
111+
return
110112
}
111-
return
112-
}
113-
114-
(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
115-
} else {
116-
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
117-
conf = sparkConf, securityManager = securityMgr)._1
118-
waitForSparkMaster()
119-
addAmIpFilter()
120-
(sparkConf.get("spark.driver.appUIAddress", ""), "")
121-
}
122-
123-
Utils.logUncaughtExceptions {
124-
val sc = sparkContextRef.get()
125-
allocator = client.register(yarnConf,
126-
if (sc != null) sc.getConf else sparkConf,
127-
if (sc != null) sc.preferredNodeLocationData else Map(),
128-
uiAddress,
129-
uiHistoryAddress)
130-
registered = true
131-
}
132113

133-
if (registered) {
134-
// Launch thread that will heartbeat to the RM so it won't think the app has died.
135-
reporterThread = launchReporterThread()
136-
137-
// Allocate all containers
138-
allocateExecutors()
139-
}
114+
(sc, sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
115+
} else {
116+
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
117+
conf = sparkConf, securityManager = securityMgr)._1
118+
waitForSparkMaster()
119+
addAmIpFilter()
120+
(null, sparkConf.get("spark.driver.appUIAddress", ""), "")
121+
}
140122

141123
val success =
142-
if (isDriver) {
143-
try {
144-
userThread.join()
145-
userResult.get()
146-
} finally {
147-
// In cluster mode, ask the reporter thread to stop since the user app is finished.
148-
reporterThread.interrupt()
124+
try {
125+
allocator = client.register(yarnConf,
126+
if (sc != null) sc.getConf else sparkConf,
127+
if (sc != null) sc.preferredNodeLocationData else Map(),
128+
uiAddress,
129+
uiHistoryAddress)
130+
131+
reporterThread = launchReporterThread()
132+
allocateExecutors()
133+
134+
if (isDriver) {
135+
try {
136+
userThread.join()
137+
userResult.get()
138+
} finally {
139+
// In cluster mode, ask the reporter thread to stop since the user app is finished.
140+
reporterThread.interrupt()
141+
}
142+
} else {
143+
// In client mode the actor will stop the reporter thread.
144+
reporterThread.join()
145+
true
149146
}
150-
} else {
151-
// In client mode the actor will stop the reporter thread.
152-
reporterThread.join()
153-
true
147+
} catch {
148+
case e: Exception =>
149+
logError("Exception while running AM main loop.", e)
150+
false
154151
}
155152

156153
finish(if (success) FinalApplicationStatus.SUCCEEDED else FinalApplicationStatus.FAILED)
157-
158154
val shouldCleanup = success || isLastAttempt()
159155
if (shouldCleanup) {
160156
cleanupStagingDir()
@@ -270,7 +266,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
270266
}
271267
}
272268

273-
// Note: this need to happen before allocateExecutors.
269+
// Note: this needs to happen before allocateExecutors.
274270
private def waitForSparkContextInitialized() {
275271
logInfo("Waiting for spark context initialization")
276272
try {

0 commit comments

Comments
 (0)