Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 57 additions & 24 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ private class ClientEndpoint(

private val lostMasters = new HashSet[RpcAddress]
private var activeMasterEndpoint: RpcEndpointRef = null
private val waitAppCompletion = conf.getBoolean("spark.standalone.submit.waitAppCompletion",
Comment thread
srowen marked this conversation as resolved.
false)
private val REPORT_DRIVER_STATUS_INTERVAL = 1000
private var submittedDriverID = ""


private def getProperty(key: String, conf: SparkConf): Option[String] = {
sys.props.get(key).orElse(conf.getOption(key))
Expand Down Expand Up @@ -124,44 +129,53 @@ private class ClientEndpoint(
}
}

/* Find out driver status then exit the JVM */
/**
* Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors
* the application until it finishes, fails or is killed.
*/
def pollAndReportStatus(driverId: String): Unit = {
// Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread
// is fine.
logInfo("... waiting before polling master for driver state")
Thread.sleep(5000)
logInfo("... polling master for driver state")
val statusResponse =
activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
if (statusResponse.found) {
logInfo(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
}
// Exception, if present
statusResponse.exception match {
case Some(e) =>
logError(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
case _ =>
System.exit(0)
val statusResponse =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indents.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated the indentation in the latest commit.

activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId))
if (statusResponse.found) {
logInfo(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
logInfo(s"Driver running on $hostPort ($id)")
case _ =>
}
// Exception, if present
statusResponse.exception match {
case Some(e) =>
logError(s"Exception from cluster was: $e")
e.printStackTrace()
System.exit(-1)
case _ =>
if (!waitAppCompletion) {
logInfo(s"spark-submit not configured to wait for completion, " +
s"exiting spark-submit JVM.")
System.exit(0)
} else {
asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(driverId))
}
}
} else {
logError(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
}
} else {
logError(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
}
}

override def receive: PartialFunction[Any, Unit] = {

case SubmitDriverResponse(master, success, driverId, message) =>
logInfo(message)
if (success) {
activeMasterEndpoint = master
submittedDriverID = driverId.get
pollAndReportStatus(driverId.get)
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
Expand All @@ -176,6 +190,25 @@ private class ClientEndpoint(
} else if (!Utils.responseFromBackup(message)) {
System.exit(-1)
}

case DriverStatusResponse(found, state, _, _, _) =>

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we could do some refactor on pollAndReportStatus in order to reduce some duplicate logic.
For example, we can only call pollAndReportStatus here, and remove other invocations in SubmitDriverResponse/ KillDriverResponse. And, of course, the pollAndReportStatus(it also needs a new name) will not poll the status anymore.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Thanks for your feedback. pollAndReportStatus is only being used the first time after submitting or killing drivers. I am not sure which is the duplicate logic you are referring to. Also, pollAndReportStatus is only polling the driver status and handling the response. If we removing polling from that, what logic should be handled there?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we removing polling from that, what logic should be handled there?

we use this:

forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
          MonitorDriverStatus()
        }, 0, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)

(but the initial delay need to change)

in this way, submitting or killing drivers will still use it only for one time when waitAppCompletion=false.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scheduling to monitor driver status is done only in case of submit and not in kill as of now. So we may need to explicitly send a message to monitor driver status after 5 seconds delay in case of kill.

@Ngone51 Ngone51 May 21, 2020

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's why I said we need to change the delay (e.g. 5s) instead of 0 for both submiting and killing.

@akshatb1 akshatb1 May 21, 2020

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we can change the delay to 5 seconds to keep it consistent with current logic. My question is that should we add the following block in case "kill" => as well or should we just monitor with a single message instead of scheduled messages?
forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { MonitorDriverStatus() }, 0, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to add forwardMessageThread.scheduleAtFixedRate(...) into any case branches but just put it as a global one(just do what you do now). I think it still works for case "kill".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in the case "launch" as of now. I will move it to a global place and refactor the code. Thanks for your suggestions.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry miss that. yea, thank you!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 I have refactored the code as suggested. Kindly review it again. Thanks.

if (found) {
state.get match {
case DriverState.FINISHED | DriverState.FAILED |
DriverState.ERROR | DriverState.KILLED =>
logInfo(s"State of $submittedDriverID is ${state.get}, " +

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s"State of driver $submittedDriverID ..."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated in the latest commit.

s"exiting spark-submit JVM.")
System.exit(0)
case _ =>
Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.sleep could still has the same issue, imaging the network drop happens during sleeping. We should control the period sending logic out of receive. We could mimic CoarseGrainedSchedulerBackend to do the same work here:

reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)

@akshatb1 akshatb1 May 16, 2020

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ngone51 Thanks for reviewing. I have updated to use the task scheduler to do the same. Could you kindly review it again and please let me know your comments?

logInfo(s"State of $submittedDriverID is ${state.get}, " +

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s"State of driver $submittedDriverID ..."

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since status polling will happen every second, I'm afraid logs can be too verbose. We can log it after a constant polling times, e.g. log every 60 times.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would produce too much logs, please consider using 'logDebug'

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have changed it to use logDebug.

s"continue monitoring driver status.")
asyncSendToMasterAndForwardReply[DriverStatusResponse](

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you submit too much applications at the same time, this could lead to heavy communication burden to the driver. I would suggest check less frequent (like increasing the interval to 10s)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987 Thanks for reviewing. I have changed it to 10 seconds and took care of your other comments. Kindly review the PR again.

RequestDriverStatus(submittedDriverID))
}
} else {
System.exit(-1)
}
}

override def onDisconnected(remoteAddress: RpcAddress): Unit = {
Expand Down
19 changes: 19 additions & 0 deletions docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,25 @@ To run an interactive Spark shell against the cluster, run the following command

You can also pass an option `--total-executor-cores <numCores>` to control the number of cores that spark-shell uses on the cluster.

# Client Properties

Spark applications supports the following configuration properties specific to standalone mode:

<table class="table">
Comment thread
akshatb1 marked this conversation as resolved.
<tr><th style="width:21%">Property Name</th><th>Default Value</th><th>Meaning</th><th>Since Version</th></tr>
<tr>
<td><code>spark.standalone.submit.waitAppCompletion</code></td>
<td><code>false</code></td>
<td>
In standalone cluster mode, controls whether the client waits to exit until the application completes.
If set to <code>true</code>, the client process will stay alive polling the application's status.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: application's or driver?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated to driver's.

Otherwise, the client process will exit after submission.
</td>
<td>3.1.0</td>
</tr>
</table>


# Launching Spark Applications

The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to
Expand Down