-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-31486] [CORE] spark.submit.waitAppCompletion flag to control spark-submit exit in Standalone Cluster Mode #28258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
13ea149
68d76d0
34c7d26
a93ce76
d5eded1
8eef373
0918106
e225495
20f1bd6
9050a08
45c9817
743d93d
fe142a8
27a81c9
0e152f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,6 +61,10 @@ private class ClientEndpoint( | |
|
|
||
| private val lostMasters = new HashSet[RpcAddress] | ||
| private var activeMasterEndpoint: RpcEndpointRef = null | ||
| private val waitAppCompletion = conf.getBoolean("spark.standalone.submit.waitAppCompletion", | ||
| false) | ||
| private val REPORT_DRIVER_STATUS_INTERVAL = 1000 | ||
|
|
||
|
|
||
| private def getProperty(key: String, conf: SparkConf): Option[String] = { | ||
| sys.props.get(key).orElse(conf.getOption(key)) | ||
|
|
@@ -124,38 +128,58 @@ private class ClientEndpoint( | |
| } | ||
| } | ||
|
|
||
| /* Find out driver status then exit the JVM */ | ||
| /** | ||
| * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors | ||
| * the application until it finishes, fails or is killed. | ||
| */ | ||
| def pollAndReportStatus(driverId: String): Unit = { | ||
| // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread | ||
| // is fine. | ||
| logInfo("... waiting before polling master for driver state") | ||
| Thread.sleep(5000) | ||
| logInfo("... polling master for driver state") | ||
| val statusResponse = | ||
| activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) | ||
| if (statusResponse.found) { | ||
| logInfo(s"State of $driverId is ${statusResponse.state.get}") | ||
| // Worker node, if present | ||
| (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { | ||
| case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => | ||
| logInfo(s"Driver running on $hostPort ($id)") | ||
| case _ => | ||
| } | ||
| // Exception, if present | ||
| statusResponse.exception match { | ||
| case Some(e) => | ||
| logError(s"Exception from cluster was: $e") | ||
| e.printStackTrace() | ||
| System.exit(-1) | ||
| case _ => | ||
| System.exit(0) | ||
| while (true) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could block
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey, please pay attention to my comment here. I believe the current implementation could block
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Ngone51 Apologies, somehow missed this comment. How can I quickly verify this? I am looking into this. Could you kindly suggest if you have any pointers on how this can be fixed?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can periodically send a message (e.g. we can send it after
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A possible way to verify this is to launch a long running application and then shutdown Master at the middle and see whether
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Ngone51 I launched a long-running application with flag enabled and disabled and stopped the Spark Master in middle. In both cases, I see the following in driver logs. I couldn't find any difference in logs.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@Ngone51 Thanks for this suggestion. Just to confirm, are you suggesting to do this in line # 180 in pollAndReportStatus method? Or should we handle this outside?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Hi @akshatb1 , logs are from
I think just after line 180 should be ok.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Ngone51 Yes, not sure about the logs from
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @Ngone51 , I tried putting periodic messages in the loop in |
||
| val statusResponse = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: indents.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, updated the indentation in the latest commit. |
||
| activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) | ||
| if (statusResponse.found) { | ||
| logInfo(s"State of $driverId is ${statusResponse.state.get}") | ||
| // Worker node, if present | ||
| (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { | ||
| case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => | ||
| logInfo(s"Driver running on $hostPort ($id)") | ||
| case _ => | ||
| } | ||
| // Exception, if present | ||
| statusResponse.exception match { | ||
| case Some(e) => | ||
| logError(s"Exception from cluster was: $e") | ||
| e.printStackTrace() | ||
| System.exit(-1) | ||
| case _ => | ||
| if (!waitAppCompletion) { | ||
| logInfo(s"spark-submit not configured to wait for completion, " + | ||
| s"exiting spark-submit JVM.") | ||
| System.exit(0) | ||
| } else { | ||
| statusResponse.state.get match { | ||
| case DriverState.FINISHED | DriverState.FAILED | | ||
| DriverState.ERROR | DriverState.KILLED => | ||
| logInfo(s"State of $driverId is ${statusResponse.state.get}, " + | ||
| s"exiting spark-submit JVM.") | ||
|
akshatb1 marked this conversation as resolved.
|
||
| System.exit(0) | ||
| case _ => | ||
| logTrace(s"State of $driverId is ${statusResponse.state.get}," + | ||
| s"continue monitoring driver status.") | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| logError(s"ERROR: Cluster master did not recognize $driverId") | ||
| System.exit(-1) | ||
| } | ||
| } else { | ||
| logError(s"ERROR: Cluster master did not recognize $driverId") | ||
| System.exit(-1) | ||
| Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) | ||
| } | ||
| } | ||
|
|
||
| override def receive: PartialFunction[Any, Unit] = { | ||
|
|
||
| case SubmitDriverResponse(master, success, driverId, message) => | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -374,6 +374,24 @@ To run an interactive Spark shell against the cluster, run the following command | |
|
|
||
| You can also pass an option `--total-executor-cores <numCores>` to control the number of cores that spark-shell uses on the cluster. | ||
|
|
||
| #Spark Properties | ||
|
akshatb1 marked this conversation as resolved.
Outdated
|
||
|
|
||
| Spark applications supports the following configuration properties specific to standalone Mode: | ||
| <table class="table"> | ||
|
akshatb1 marked this conversation as resolved.
|
||
| <tr><th style="width:21%">Property Name</th><th>Default Value</th><th>Meaning</th><th>Since Version</th></tr> | ||
| <tr> | ||
| <td><code>spark.standalone.submit.waitAppCompletion</code></td> | ||
| <td><code>false</code></td> | ||
| <td> | ||
| In Standalone cluster mode, controls whether the client waits to exit until the application completes. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Standalone -> standalone
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated in the latest commit. |
||
| If set to <code>true</code>, the client process will stay alive reporting the application's status. | ||
| Otherwise, the client process will exit after submission. | ||
| </td> | ||
| <td>3.1.0</td> | ||
| </tr> | ||
| </table> | ||
|
|
||
|
|
||
| # Launching Spark Applications | ||
|
|
||
| The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to | ||
|
|
||


Uh oh!
There was an error while loading. Please reload this page.