Skip to content

Commit b65bad6

Browse files
WangTaoTheTonictgravescs
authored andcommitted
[SPARK-3591][YARN]fire and forget for YARN cluster mode
https://issues.apache.org/jira/browse/SPARK-3591 The output after this patch: >doggie153:/opt/oss/spark-1.3.0-bin-hadoop2.4/bin # ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster ../lib/spark-examples*.jar 15/03/31 21:15:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/31 21:15:25 INFO RMProxy: Connecting to ResourceManager at doggie153/10.177.112.153:8032 15/03/31 21:15:25 INFO Client: Requesting a new application from cluster with 4 NodeManagers 15/03/31 21:15:25 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 15/03/31 21:15:25 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/03/31 21:15:25 INFO Client: Setting up container launch context for our AM 15/03/31 21:15:25 INFO Client: Preparing resources for our AM container 15/03/31 21:15:26 INFO Client: Uploading resource file:/opt/oss/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-SNAPSHOT-hadoop2.4.1.jar -> hdfs://doggie153:9000/user/root/.sparkStaging/application_1427257505534_0016/spark-assembly-1.4.0-SNAPSHOT-hadoop2.4.1.jar 15/03/31 21:15:27 INFO Client: Uploading resource file:/opt/oss/spark-1.3.0-bin-hadoop2.4/lib/spark-examples-1.3.0-hadoop2.4.0.jar -> hdfs://doggie153:9000/user/root/.sparkStaging/application_1427257505534_0016/spark-examples-1.3.0-hadoop2.4.0.jar 15/03/31 21:15:28 INFO Client: Setting up the launch environment for our AM container 15/03/31 21:15:28 INFO SecurityManager: Changing view acls to: root 15/03/31 21:15:28 INFO SecurityManager: Changing modify acls to: root 15/03/31 21:15:28 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/03/31 21:15:28 INFO Client: Submitting application 16 to ResourceManager 15/03/31 21:15:28 INFO YarnClientImpl: Submitted application application_1427257505534_0016 15/03/31 21:15:28 INFO Client: ... waiting before polling ResourceManager for application state 15/03/31 21:15:33 INFO Client: ... polling ResourceManager for application state 15/03/31 21:15:33 INFO Client: Application report for application_1427257505534_0016 (state: RUNNING) 15/03/31 21:15:33 INFO Client: client token: N/A diagnostics: N/A ApplicationMaster host: doggie157 ApplicationMaster RPC port: 0 queue: default start time: 1427807728307 final status: UNDEFINED tracking URL: http://doggie153:8088/proxy/application_1427257505534_0016/ user: root /cc andrewor14 Author: WangTaoTheTonic <[email protected]> Closes apache#5297 from WangTaoTheTonic/SPARK-3591 and squashes the following commits: c76d232 [WangTaoTheTonic] wrap lines 16c90a8 [WangTaoTheTonic] move up lines to avoid duplicate fea390d [WangTaoTheTonic] log failed/killed report, style and comment be1cc2e [WangTaoTheTonic] reword f0bc54f [WangTaoTheTonic] minor: expose appid in excepiton messages ba9b22b [WangTaoTheTonic] wrong config name e1a4013 [WangTaoTheTonic] revert to the old version and do some robust 19706c0 [WangTaoTheTonic] add a config to control whether to forget 0cbdce8 [WangTaoTheTonic] fire and forget for YARN cluster mode
1 parent ae980eb commit b65bad6

File tree

4 files changed

+61
-35
lines changed

4 files changed

+61
-35
lines changed

core/src/main/scala/org/apache/spark/deploy/Client.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
8989

9090
/* Find out driver status then exit the JVM */
9191
def pollAndReportStatus(driverId: String) {
92-
println(s"... waiting before polling master for driver state")
92+
println("... waiting before polling master for driver state")
9393
Thread.sleep(5000)
9494
println("... polling master for driver state")
9595
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ private[deploy] class StandaloneRestClient extends Logging {
245245
}
246246
} else {
247247
val failMessage = Option(submitResponse.message).map { ": " + _ }.getOrElse("")
248-
logError("Application submission failed" + failMessage)
248+
logError(s"Application submission failed$failMessage")
249249
}
250250
}
251251

docs/running-on-yarn.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,15 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
196196
It should be no larger than the global number of max attempts in the YARN configuration.
197197
</td>
198198
</tr>
199+
<tr>
200+
<td><code>spark.yarn.submit.waitAppCompletion</code></td>
201+
<td>true</td>
202+
<td>
203+
In YARN cluster mode, controls whether the client waits to exit until the application completes.
204+
If set to true, the client process will stay alive reporting the application's status.
205+
Otherwise, the client process will exit after submission.
206+
</td>
207+
</tr>
199208
</table>
200209

201210
# Launching Spark on YARN

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ private[spark] class Client(
6666
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
6767
private val distCacheMgr = new ClientDistributedCacheManager()
6868
private val isClusterMode = args.isClusterMode
69+
private val fireAndForget = isClusterMode &&
70+
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
6971

7072

7173
def stop(): Unit = yarnClient.stop()
@@ -564,31 +566,13 @@ private[spark] class Client(
564566

565567
if (logApplicationReport) {
566568
logInfo(s"Application report for $appId (state: $state)")
567-
val details = Seq[(String, String)](
568-
("client token", getClientToken(report)),
569-
("diagnostics", report.getDiagnostics),
570-
("ApplicationMaster host", report.getHost),
571-
("ApplicationMaster RPC port", report.getRpcPort.toString),
572-
("queue", report.getQueue),
573-
("start time", report.getStartTime.toString),
574-
("final status", report.getFinalApplicationStatus.toString),
575-
("tracking URL", report.getTrackingUrl),
576-
("user", report.getUser)
577-
)
578-
579-
// Use more loggable format if value is null or empty
580-
val formattedDetails = details
581-
.map { case (k, v) =>
582-
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
583-
s"\n\t $k: $newValue" }
584-
.mkString("")
585569

586570
// If DEBUG is enabled, log report details every iteration
587571
// Otherwise, log them every time the application changes state
588572
if (log.isDebugEnabled) {
589-
logDebug(formattedDetails)
573+
logDebug(formatReportDetails(report))
590574
} else if (lastState != state) {
591-
logInfo(formattedDetails)
575+
logInfo(formatReportDetails(report))
592576
}
593577
}
594578

@@ -609,24 +593,57 @@ private[spark] class Client(
609593
throw new SparkException("While loop is depleted! This should never happen...")
610594
}
611595

596+
private def formatReportDetails(report: ApplicationReport): String = {
597+
val details = Seq[(String, String)](
598+
("client token", getClientToken(report)),
599+
("diagnostics", report.getDiagnostics),
600+
("ApplicationMaster host", report.getHost),
601+
("ApplicationMaster RPC port", report.getRpcPort.toString),
602+
("queue", report.getQueue),
603+
("start time", report.getStartTime.toString),
604+
("final status", report.getFinalApplicationStatus.toString),
605+
("tracking URL", report.getTrackingUrl),
606+
("user", report.getUser)
607+
)
608+
609+
// Use more loggable format if value is null or empty
610+
details.map { case (k, v) =>
611+
val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
612+
s"\n\t $k: $newValue"
613+
}.mkString("")
614+
}
615+
612616
/**
613-
* Submit an application to the ResourceManager and monitor its state.
614-
* This continues until the application has exited for any reason.
617+
* Submit an application to the ResourceManager.
618+
* If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
619+
* reporting the application's status until the application has exited for any reason.
620+
* Otherwise, the client process will exit after submission.
615621
* If the application finishes with a failed, killed, or undefined status,
616622
* throw an appropriate SparkException.
617623
*/
618624
def run(): Unit = {
619-
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
620-
if (yarnApplicationState == YarnApplicationState.FAILED ||
621-
finalApplicationStatus == FinalApplicationStatus.FAILED) {
622-
throw new SparkException("Application finished with failed status")
623-
}
624-
if (yarnApplicationState == YarnApplicationState.KILLED ||
625-
finalApplicationStatus == FinalApplicationStatus.KILLED) {
626-
throw new SparkException("Application is killed")
627-
}
628-
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
629-
throw new SparkException("The final status of application is undefined")
625+
val appId = submitApplication()
626+
if (fireAndForget) {
627+
val report = getApplicationReport(appId)
628+
val state = report.getYarnApplicationState
629+
logInfo(s"Application report for $appId (state: $state)")
630+
logInfo(formatReportDetails(report))
631+
if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
632+
throw new SparkException(s"Application $appId finished with status: $state")
633+
}
634+
} else {
635+
val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
636+
if (yarnApplicationState == YarnApplicationState.FAILED ||
637+
finalApplicationStatus == FinalApplicationStatus.FAILED) {
638+
throw new SparkException(s"Application $appId finished with failed status")
639+
}
640+
if (yarnApplicationState == YarnApplicationState.KILLED ||
641+
finalApplicationStatus == FinalApplicationStatus.KILLED) {
642+
throw new SparkException(s"Application $appId is killed")
643+
}
644+
if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
645+
throw new SparkException(s"The final status of application $appId is undefined")
646+
}
630647
}
631648
}
632649
}

0 commit comments

Comments
 (0)