Skip to content

Commit ee22be0

Browse files
mateizpwendell
authored andcommitted
Merge pull request alteryx#189 from tgravescs/sparkYarnErrorHandling
Impove Spark on Yarn Error handling Improve cli error handling and only allow a certain number of worker failures before failing the application. This will help prevent users from doing foolish things and their jobs running forever. For instance using 32 bit java but trying to allocate 8G containers. This loops forever without this change, now it errors out after a certain number of retries. The number of tries is configurable. Also increase the frequency we ping the RM to increase speed at which we get containers if they die. The Yarn MR app defaults to pinging the RM every 1 seconds, so the default of 5 seconds here is fine. But that is configurable as well in case people want to change it. I do want to make sure there aren't any cases that calling stopExecutors in CoarseGrainedSchedulerBackend would cause problems? I couldn't think of any and testing on standalone cluster as well as yarn. (cherry picked from commit aa638ed) Signed-off-by: Patrick Wendell <[email protected]>
1 parent d77c337 commit ee22be0

File tree

6 files changed

+61
-30
lines changed

6 files changed

+61
-30
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
199199
}
200200

201201
override def stop() {
202+
stopExecutors()
202203
try {
203204
if (driverActor != null) {
204205
val future = driverActor.ask(StopDriver)(timeout)

core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ private[spark] class SimrSchedulerBackend(
6262
val conf = new Configuration()
6363
val fs = FileSystem.get(conf)
6464
fs.delete(new Path(driverFilePath), false)
65-
super.stopExecutors()
6665
super.stop()
6766
}
6867
}

docs/running-on-yarn.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ System Properties:
3737
* 'spark.yarn.applicationMaster.waitTries', property to set the number of times the ApplicationMaster waits for the the spark master and then also the number of tries it waits for the Spark Context to be intialized. Default is 10.
3838
* 'spark.yarn.submit.file.replication', the HDFS replication level for the files uploaded into HDFS for the application. These include things like the spark jar, the app jar, and any distributed cache files/archives.
3939
* 'spark.yarn.preserve.staging.files', set to true to preserve the staged files(spark jar, app jar, distributed cache files) at the end of the job rather then delete them.
40+
* 'spark.yarn.scheduler.heartbeat.interval-ms', the interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. Default is 5 seconds.
41+
* 'spark.yarn.max.worker.failures', the maximum number of worker failures before failing the application. Default is the number of workers requested times 2 with minimum of 3.
4042

4143
# Launching Spark on YARN
4244

yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
5454
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
5555
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
5656
private var isLastAMRetry: Boolean = true
57-
57+
// default to numWorkers * 2, with minimum of 3
58+
private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
59+
math.max(args.numWorkers * 2, 3).toString()).toInt
5860

5961
def run() {
6062
// setup the directories so things go to yarn approved directories rather
@@ -227,12 +229,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
227229

228230
if (null != sparkContext) {
229231
uiAddress = sparkContext.ui.appUIAddress
230-
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
231-
sparkContext.preferredNodeLocationData)
232+
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
233+
appAttemptId, args, sparkContext.preferredNodeLocationData)
232234
} else {
233235
logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
234-
", numTries = " + numTries)
235-
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
236+
", numTries = " + numTries)
237+
this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
238+
appAttemptId, args)
236239
}
237240
}
238241
} finally {
@@ -251,8 +254,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
251254
while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
252255
// If user thread exists, then quit !
253256
userThread.isAlive) {
254-
255-
this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
257+
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
258+
finishApplicationMaster(FinalApplicationStatus.FAILED,
259+
"max number of worker failures reached")
260+
}
261+
yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
256262
ApplicationMaster.incrementAllocatorLoop(1)
257263
Thread.sleep(100)
258264
}
@@ -268,21 +274,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
268274
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
269275

270276
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
271-
// must be <= timeoutInterval/ 2.
272-
// On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
273-
// so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
274-
val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
277+
278+
// we want to be reasonably responsive without causing too many requests to RM.
279+
val schedulerInterval =
280+
System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
281+
282+
// must be <= timeoutInterval / 2.
283+
val interval = math.min(timeoutInterval / 2, schedulerInterval)
275284
launchReporterThread(interval)
276285
}
277286
}
278287

279-
// TODO: We might want to extend this to allocate more containers in case they die !
280288
private def launchReporterThread(_sleepTime: Long): Thread = {
281289
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
282290

283291
val t = new Thread {
284292
override def run() {
285293
while (userThread.isAlive) {
294+
if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
295+
finishApplicationMaster(FinalApplicationStatus.FAILED,
296+
"max number of worker failures reached")
297+
}
286298
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
287299
if (missingWorkerCount > 0) {
288300
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
@@ -321,7 +333,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
321333
}
322334
*/
323335

324-
def finishApplicationMaster(status: FinalApplicationStatus) {
336+
def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
325337

326338
synchronized {
327339
if (isFinished) {
@@ -335,6 +347,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
335347
.asInstanceOf[FinishApplicationMasterRequest]
336348
finishReq.setAppAttemptId(appAttemptId)
337349
finishReq.setFinishApplicationStatus(status)
350+
finishReq.setDiagnostics(diagnostics)
338351
// set tracking url to empty since we don't have a history server
339352
finishReq.setTrackingUrl("")
340353
resourceManager.finishApplicationMaster(finishReq)

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
6060
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
6161

6262
def run() {
63+
validateArgs()
64+
6365
init(yarnConf)
6466
start()
6567
logClusterResourceDetails()
@@ -84,6 +86,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
8486
System.exit(0)
8587
}
8688

89+
def validateArgs() = {
90+
Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
91+
(args.userJar == null) -> "Error: You must specify a user jar!",
92+
(args.userClass == null) -> "Error: You must specify a user class!",
93+
(args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
94+
(args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
95+
("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD),
96+
(args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
97+
("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString()))
98+
.foreach { case(cond, errStr) =>
99+
if (cond) {
100+
logError(errStr)
101+
args.printUsageAndExit(1)
102+
}
103+
}
104+
}
105+
87106
def getAppStagingDir(appId: ApplicationId): String = {
88107
SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
89108
}
@@ -97,7 +116,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
97116
", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
98117
", queueChildQueueCount=" + queueInfo.getChildQueues.size)
99118
}
100-
101119

102120
def verifyClusterResources(app: GetNewApplicationResponse) = {
103121
val maxMem = app.getMaximumResourceCapability().getMemory()
@@ -215,11 +233,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
215233

216234
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
217235

218-
if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
219-
logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
220-
System.exit(1)
221-
}
222-
223236
Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
224237
Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
225238
.foreach { case(destName, _localPath) =>
@@ -334,7 +347,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
334347
JAVA_OPTS += " -Djava.io.tmpdir=" +
335348
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
336349

337-
338350
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
339351
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
340352
// node, spark gc effects all other containers performance (which can also be other spark containers)
@@ -360,11 +372,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
360372
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
361373
}
362374

363-
if (args.userClass == null) {
364-
logError("Error: You must specify a user class!")
365-
System.exit(1)
366-
}
367-
368375
val commands = List[String](javaCommand +
369376
" -server " +
370377
JAVA_OPTS +
@@ -442,6 +449,7 @@ object Client {
442449
System.setProperty("SPARK_YARN_MODE", "true")
443450

444451
val args = new ClientArguments(argStrings)
452+
445453
new Client(args).run
446454
}
447455

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
7272
// Used to generate a unique id per worker
7373
private val workerIdCounter = new AtomicInteger()
7474
private val lastResponseId = new AtomicInteger()
75+
private val numWorkersFailed = new AtomicInteger()
7576

7677
def getNumWorkersRunning: Int = numWorkersRunning.intValue
7778

79+
def getNumWorkersFailed: Int = numWorkersFailed.intValue
7880

7981
def isResourceConstraintSatisfied(container: Container): Boolean = {
8082
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
@@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
253255
else {
254256
// simply decrement count - next iteration of ReporterThread will take care of allocating !
255257
numWorkersRunning.decrementAndGet()
256-
logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState +
257-
" httpaddress: " + completedContainer.getDiagnostics)
258+
logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
259+
" httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())
260+
261+
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
262+
// there are some exit status' we shouldn't necessarily count against us, but for
263+
// now I think its ok as none of the containers are expected to exit
264+
if (completedContainer.getExitStatus() != 0) {
265+
logInfo("Container marked as failed: " + containerId)
266+
numWorkersFailed.incrementAndGet()
267+
}
258268
}
259269

260270
allocatedHostToContainersMap.synchronized {
@@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
378388
val releasedContainerList = createReleasedContainerList()
379389
req.addAllReleases(releasedContainerList)
380390

381-
382-
383391
if (numWorkers > 0) {
384392
logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.")
385393
}

0 commit comments

Comments
 (0)