Skip to content

Commit c0794be

Browse files
author
Marcelo Vanzin
committed
Correctly clean up staging directory.
This change also avoids overriding the app's status with "SUCCEEDED" in cluster mode when the shutdown hook runs, by signaling the AM that the SparkContext was shut down (see YarnClusterScheduler.scala). That way the AM can correctly expose its final status to the RM.
1 parent 92770cc commit c0794be

File tree

3 files changed

+60
-62
lines changed

3 files changed

+60
-62
lines changed

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 49 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.yarn
1919

2020
import java.io.IOException
2121
import java.net.Socket
22-
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
22+
import java.util.concurrent.atomic.AtomicReference
2323

2424
import scala.collection.JavaConversions._
2525
import scala.util.Try
@@ -57,6 +57,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
5757
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
5858

5959
@volatile private var finished = false
60+
@volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
61+
6062
private var reporterThread: Thread = _
6163
private var allocator: YarnAllocator = _
6264

@@ -66,9 +68,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
6668

6769
// Fields used in cluster mode.
6870
private val sparkContextRef = new AtomicReference[SparkContext](null)
69-
private val userResult = new AtomicBoolean(false)
7071

71-
final def run(): Unit = {
72+
final def run(): Int = {
7273
if (isDriver) {
7374
// Set the web ui port to be ephemeral for yarn so we don't conflict with
7475
// other spark processes running on the same box
@@ -80,43 +81,49 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
8081

8182
logInfo("ApplicationAttemptId: " + client.getAttemptId())
8283

83-
// If this is the last attempt, register a shutdown hook to cleanup the staging dir
84-
// after the app is finished, in case it does not exit through the expected means.
85-
// Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
86-
if (isLastAttempt()) {
87-
val cleanupHook = new Runnable {
88-
override def run() {
89-
logInfo("AppMaster received a signal.")
90-
if (!finished) {
91-
cleanupStagingDir()
92-
}
84+
val cleanupHook = new Runnable {
85+
override def run() {
86+
// If the SparkContext is still registered, shut it down as a best case effort in case
87+
// users do not call sc.stop or do System.exit().
88+
val sc = sparkContextRef.get()
89+
if (sc != null) {
90+
logInfo("Invoking sc stop from shutdown hook")
91+
sc.stop()
92+
finish(FinalApplicationStatus.SUCCEEDED)
93+
}
94+
95+
// Cleanup the staging dir after the app is finished, or if it's the last attempt at
96+
// running the AM.
97+
val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
98+
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
99+
if (finished || isLastAttempt) {
100+
cleanupStagingDir()
93101
}
94102
}
95-
ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
96103
}
104+
// Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
105+
ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
97106

98107
// Call this to force generation of secret so it gets populated into the
99108
// Hadoop UGI. This has to happen before the startUserClass which does a
100109
// doAs in order for the credentials to be passed on to the executor containers.
101110
val securityMgr = new SecurityManager(sparkConf)
102111

103-
val success =
104-
try {
105-
if (isDriver) runDriver() else runExecutorLauncher(securityMgr)
106-
} catch {
107-
case e: Exception =>
108-
logError("Exception while running AM main loop.", e)
109-
false
110-
}
112+
if (isDriver) {
113+
runDriver()
114+
} else {
115+
runExecutorLauncher(securityMgr)
116+
}
111117

112-
finish(if (success) FinalApplicationStatus.SUCCEEDED else FinalApplicationStatus.FAILED)
113-
val shouldCleanup = success || isLastAttempt()
114-
if (shouldCleanup) {
115-
cleanupStagingDir()
118+
if (finalStatus != FinalApplicationStatus.UNDEFINED) {
119+
finish(finalStatus)
120+
0
121+
} else {
122+
1
116123
}
117124
}
118125

119-
final def finish(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
126+
final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
120127
if (!finished) {
121128
logInfo(s"Finishing ApplicationMaster with $status" +
122129
Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
@@ -127,33 +134,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
127134
reporterThread.join()
128135
}
129136
} finally {
130-
client.shutdown(status, diagnostics)
137+
client.shutdown(status, Option(diagnostics).getOrElse(""))
131138
}
132139
}
133140
}
134141

135-
private[spark] def sparkContextInitialized(sc: SparkContext) = {
136-
var modified = false
142+
private def sparkContextInitialized(sc: SparkContext) = {
137143
sparkContextRef.synchronized {
138-
modified = sparkContextRef.compareAndSet(null, sc)
144+
sparkContextRef.compareAndSet(null, sc)
139145
sparkContextRef.notifyAll()
140146
}
147+
}
141148

142-
// Add a shutdown hook - as a best case effort in case users do not call sc.stop or do
143-
// System.exit.
144-
// Should not really have to do this, but it helps YARN to evict resources earlier.
145-
// Not to mention, prevent the Client from declaring failure even though we exited properly.
146-
// Note that this will unfortunately not properly clean up the staging files because it gets
147-
// called too late, after the filesystem is already shutdown.
148-
if (modified) {
149-
Runtime.getRuntime().addShutdownHook(new Thread {
150-
override def run() {
151-
logInfo("Invoking sc stop from shutdown hook")
152-
sc.stop()
153-
finish(FinalApplicationStatus.SUCCEEDED)
154-
}
155-
})
156-
}
149+
private def sparkContextStopped(sc: SparkContext) = {
150+
sparkContextRef.compareAndSet(sc, null)
157151
}
158152

159153
private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
@@ -168,7 +162,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
168162
reporterThread = launchReporterThread()
169163
}
170164

171-
private def runDriver(): Boolean = {
165+
private def runDriver(): Unit = {
172166
addAmIpFilter()
173167
val userThread = startUserClass()
174168

@@ -179,20 +173,18 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
179173
// If there is no SparkContext at this point, just fail the app.
180174
if (sc == null) {
181175
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
182-
false
183176
} else {
184177
registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
185178
try {
186179
userThread.join()
187-
userResult.get()
188180
} finally {
189181
// In cluster mode, ask the reporter thread to stop since the user app is finished.
190182
reporterThread.interrupt()
191183
}
192184
}
193185
}
194186

195-
private def runExecutorLauncher(securityMgr: SecurityManager): Boolean = {
187+
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
196188
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
197189
conf = sparkConf, securityManager = securityMgr)._1
198190
actor = waitForSparkDriver()
@@ -201,12 +193,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
201193

202194
// In client mode the actor will stop the reporter thread.
203195
reporterThread.join()
204-
true
205-
}
206-
207-
private def isLastAttempt() = {
208-
val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
209-
client.getAttemptId().getAttemptId() >= maxAppAttempts
196+
finalStatus = FinalApplicationStatus.SUCCEEDED
210197
}
211198

212199
private def launchReporterThread(): Thread = {
@@ -361,7 +348,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
361348
mainMethod.invoke(null, mainArgs)
362349
// Some apps have "System.exit(0)" at the end. The user thread will stop here unless
363350
// it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED.
364-
userResult.set(true)
351+
finalStatus = FinalApplicationStatus.SUCCEEDED
365352
} finally {
366353
logDebug("Finishing main")
367354
}
@@ -408,14 +395,18 @@ object ApplicationMaster extends Logging {
408395
val amArgs = new ApplicationMasterArguments(args)
409396
SparkHadoopUtil.get.runAsSparkUser { () =>
410397
master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
411-
master.run()
398+
System.exit(master.run())
412399
}
413400
}
414401

415402
private[spark] def sparkContextInitialized(sc: SparkContext) = {
416403
master.sparkContextInitialized(sc)
417404
}
418405

406+
private[spark] def sparkContextStopped(sc: SparkContext) = {
407+
master.sparkContextStopped(sc)
408+
}
409+
419410
}
420411

421412
/**

yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ import org.apache.spark.util.Utils
2424
import org.apache.hadoop.conf.Configuration
2525

2626
/**
27-
*
28-
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
27+
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of
28+
* ApplicationMaster, etc is done
2929
*/
30-
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
30+
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
31+
extends TaskSchedulerImpl(sc) {
3132

3233
logInfo("Created YarnClusterScheduler")
3334

@@ -51,4 +52,10 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
5152
super.postStartHook()
5253
logInfo("YarnClusterScheduler.postStartHook done")
5354
}
55+
56+
override def stop() {
57+
super.stop()
58+
ApplicationMaster.sparkContextStopped(sc)
59+
}
60+
5461
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ private[yarn] class YarnAllocationHandler(
112112
}
113113

114114
override def allocateResources() = {
115-
addResourceRequests(maxExecutors - numPendingAllocate.get())
115+
addResourceRequests(maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get())
116116

117117
// We have already set the container request. Poll the ResourceManager for a response.
118118
// This doubles as a heartbeat if there are no pending container requests.

0 commit comments

Comments
 (0)