Skip to content

Commit 02a571a

Browse files
committed
SPARK-2425 introduce LOADING -> RUNNING ApplicationState transition
and prevent Master from removing Application with RUNNING Executors Conflicts: core/src/main/scala/org/apache/spark/deploy/master/Master.scala core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
1 parent 7a89250 commit 02a571a

File tree

4 files changed

+22
-12
lines changed

4 files changed

+22
-12
lines changed

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,13 @@ private[spark] class ApplicationInfo(
9090

9191
def retryCount = _retryCount
9292

93-
def incrementRetryCount = {
93+
def incrementRetryCount() = {
9494
_retryCount += 1
9595
_retryCount
9696
}
9797

98+
def resetRetryCount() = _retryCount = 0
99+
98100
def markFinished(endState: ApplicationState.Value) {
99101
state = endState
100102
endTime = System.currentTimeMillis()

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -264,22 +264,29 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
264264
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
265265
execOption match {
266266
case Some(exec) => {
267+
val appInfo = idToApp(appId)
267268
exec.state = state
269+
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
268270
exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
269271
if (ExecutorState.isFinished(state)) {
270-
val appInfo = idToApp(appId)
271272
// Remove this executor from the worker and app
272273
logInfo("Removing executor " + exec.fullId + " because it is " + state)
273274
appInfo.removeExecutor(exec)
274275
exec.worker.removeExecutor(exec)
275276

277+
val normalExit = exitStatus == Some(0)
276278
// Only retry certain number of times so we don't go into an infinite loop.
277-
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
278-
schedule()
279-
} else {
280-
logError("Application %s with ID %s failed %d times, removing it".format(
281-
appInfo.desc.name, appInfo.id, appInfo.retryCount))
282-
removeApplication(appInfo, ApplicationState.FAILED)
279+
if (!normalExit) {
280+
if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
281+
schedule()
282+
} else {
283+
logError("Application %s with ID %s failed %d times, removing it".format(
284+
appInfo.desc.name, appInfo.id, appInfo.retryCount))
285+
val execs = idToApp(appId).executors.values
286+
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
287+
removeApplication(appInfo, ApplicationState.FAILED)
288+
}
289+
}
283290
}
284291
}
285292
}

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,10 @@ private[spark] class ExecutorRunner(
139139
Files.write(header, stderr, Charsets.UTF_8)
140140
CommandUtils.redirectStream(process.getErrorStream, stderr)
141141

142-
// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
143-
// long-lived processes only. However, in the future, we might restart the executor a few
144-
// times on the same machine.
142+
state = ExecutorState.RUNNING
143+
worker ! ExecutorStateChanged(appId, execId, state, None, None)
144+
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
145+
// or with nonzero exit code
145146
val exitCode = process.waitFor()
146147
state = ExecutorState.FAILED
147148
val message = "Command exited with code " + exitCode

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ private[spark] class Worker(
216216
// ApplicationDescription to be more explicit about this.
217217
val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
218218
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
219-
self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
219+
self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.LOADING)
220220
executors(appId + "/" + execId) = manager
221221
manager.start()
222222
coresUsed += cores_

0 commit comments

Comments
 (0)