Skip to content

Commit b6289ab

Browse files
author
Marcelo Vanzin
committed
Move cluster/client code to separate methods.
Makes code a little cleaner and easier to follow.
1 parent ecb23cd commit b6289ab

File tree

1 file changed

+52
-54
lines changed

1 file changed

+52
-54
lines changed

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 52 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -90,60 +90,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
9090
// doAs in order for the credentials to be passed on to the executor containers.
9191
val securityMgr = new SecurityManager(sparkConf)
9292

93-
val (sc, uiAddress, uiHistoryAddress) =
94-
if (isDriver) {
95-
addAmIpFilter()
96-
userThread = startUserClass()
97-
98-
// This a bit hacky, but we need to wait until the spark.driver.port property has
99-
// been set by the Thread executing the user class.
100-
waitForSparkContextInitialized()
101-
102-
val sc = sparkContextRef.get()
103-
104-
// If there is no SparkContext at this point, just fail the app.
105-
if (sc == null) {
106-
107-
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
108-
if (isLastAttempt()) {
109-
cleanupStagingDir()
110-
}
111-
return
112-
}
113-
114-
(sc, sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
115-
} else {
116-
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
117-
conf = sparkConf, securityManager = securityMgr)._1
118-
waitForSparkDriver()
119-
addAmIpFilter()
120-
(null, sparkConf.get("spark.driver.appUIAddress", ""), "")
121-
}
122-
12393
val success =
12494
try {
125-
allocator = client.register(yarnConf,
126-
if (sc != null) sc.getConf else sparkConf,
127-
if (sc != null) sc.preferredNodeLocationData else Map(),
128-
uiAddress,
129-
uiHistoryAddress)
130-
131-
reporterThread = launchReporterThread()
132-
allocateExecutors()
133-
134-
if (isDriver) {
135-
try {
136-
userThread.join()
137-
userResult.get()
138-
} finally {
139-
// In cluster mode, ask the reporter thread to stop since the user app is finished.
140-
reporterThread.interrupt()
141-
}
142-
} else {
143-
// In client mode the actor will stop the reporter thread.
144-
reporterThread.join()
145-
true
146-
}
95+
if (isDriver) runDriver() else runExecutorLauncher(securityMgr)
14796
} catch {
14897
case e: Exception =>
14998
logError("Exception while running AM main loop.", e)
@@ -195,6 +144,55 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
195144
}
196145
}
197146

147+
private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
148+
val sc = sparkContextRef.get()
149+
allocator = client.register(yarnConf,
150+
if (sc != null) sc.getConf else sparkConf,
151+
if (sc != null) sc.preferredNodeLocationData else Map(),
152+
uiAddress,
153+
uiHistoryAddress)
154+
155+
reporterThread = launchReporterThread()
156+
}
157+
158+
private def runDriver(): Boolean = {
159+
addAmIpFilter()
160+
userThread = startUserClass()
161+
162+
// This a bit hacky, but we need to wait until the spark.driver.port property has
163+
// been set by the Thread executing the user class.
164+
waitForSparkContextInitialized()
165+
166+
val sc = sparkContextRef.get()
167+
168+
// If there is no SparkContext at this point, just fail the app.
169+
if (sc == null) {
170+
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
171+
false
172+
} else {
173+
registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
174+
try {
175+
userThread.join()
176+
userResult.get()
177+
} finally {
178+
// In cluster mode, ask the reporter thread to stop since the user app is finished.
179+
reporterThread.interrupt()
180+
}
181+
}
182+
}
183+
184+
private def runExecutorLauncher(securityMgr: SecurityManager): Boolean = {
185+
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
186+
conf = sparkConf, securityManager = securityMgr)._1
187+
waitForSparkDriver()
188+
addAmIpFilter()
189+
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), "")
190+
191+
// In client mode the actor will stop the reporter thread.
192+
reporterThread.join()
193+
true
194+
}
195+
198196
private def isLastAttempt() = {
199197
val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
200198
client.getAttemptId().getAttemptId() >= maxAppAttempts
@@ -297,7 +295,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
297295
var driverUp = false
298296
val hostport = args.userArgs(0)
299297
val (driverHost, driverPort) = Utils.parseHostPort(hostport)
300-
while(!driverUp) {
298+
while (!driverUp) {
301299
try {
302300
val socket = new Socket(driverHost, driverPort)
303301
socket.close()
@@ -307,7 +305,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
307305
case e: Exception =>
308306
logError("Failed to connect to driver at %s:%s, retrying ...".
309307
format(driverHost, driverPort))
310-
Thread.sleep(100)
308+
Thread.sleep(100)
311309
}
312310
}
313311
sparkConf.set("spark.driver.host", driverHost)

0 commit comments

Comments
 (0)