Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 50 additions & 19 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,25 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{ActorLogReceive, AkkaUtils, Utils}
import scala.collection.mutable.HashSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please reorganize the imports


/**
* Proxy that relays messages to the driver.
*/
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
extends Actor with ActorLogReceive with Logging {

var masterActor: ActorSelection = _
val mastersActor = driverArgs.masters.map { m =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this masterActors

context.actorSelection(Master.toAkkaUrl(m, AkkaUtils.protocol(context.system)))
}
val lostMasters = new HashSet[Address]
var activeMasterActor: ActorSelection = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make these all private


val timeout = AkkaUtils.askTimeout(conf)

override def preStart(): Unit = {
masterActor = context.actorSelection(
Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(context.system)))

context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")

driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
Expand Down Expand Up @@ -79,11 +80,15 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
driverArgs.supervise,
command)

for (masterActor <- mastersActor) {
masterActor ! RequestSubmitDriver(driverDescription)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to submit to all masters right? I believe HA means we want to failover to backup Masters in case the primary one fails.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, it seems that we guard against this in this line and already do something similar in client mode. Can we add a comment here (and in L89) that says something like

// This assumes only one Master is active at a time


case "kill" =>
val driverId = driverArgs.driverId
for (masterActor <- mastersActor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to indent these

masterActor ! RequestKillDriver(driverId)
}
}
}

Expand All @@ -92,14 +97,15 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
println(s"... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout)
.mapTo[DriverStatusResponse]
val statusResponse = Await.result(statusFuture, timeout)

statusResponse.found match {
case false =>
println(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
statusResponse.exception.getOrElse {
println(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes behavior. If statusResponse.exception is defined then this doesn't exit. I would just revert this change.

case true =>
println(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
Expand All @@ -115,27 +121,50 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
System.exit(-1)
}
System.exit(0)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unindent

}

override def receiveWithLogging: PartialFunction[Any, Unit] = {

case SubmitDriverResponse(success, driverId, message) =>
println(message)
if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
if (success) {
activeMasterActor = context.actorSelection(sender.path)
pollAndReportStatus(driverId.get)
} else if (!message.contains("Can only")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain what "Can only" is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, it's from this message: Can only accept driver submissions in ALIVE state.

System.exit(-1)
}


case KillDriverResponse(driverId, success, message) =>
println(message)
if (success) pollAndReportStatus(driverId) else System.exit(-1)
if (success) {
activeMasterActor = context.actorSelection(sender.path)
pollAndReportStatus(driverId)
} else if (!message.contains("Can only")) {
System.exit(-1)
}

case DisassociatedEvent(_, remoteAddress, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
System.exit(-1)
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master $remoteAddress.")
lostMasters += remoteAddress
if (lostMasters.size >= mastersActor.size) {
println(s"No master is available, exiting.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for the "s" here

System.exit(-1)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is safe to do. A failed master can come back up, but according to this code it will still be in lostMasters. We need to somehow detect when a failed master has recovered.


case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
println(s"Cause was: $cause")
System.exit(-1)
if (!lostMasters.contains(remoteAddress)) {
println(s"Error connecting to master ($remoteAddress).")
println(s"Cause was: $cause")
lostMasters += remoteAddress
if (lostMasters.size >= mastersActor.size) {
println(s"No master is available, exiting.")
System.exit(-1)
}
}
}
}

Expand Down Expand Up @@ -163,7 +192,9 @@ object Client {
"driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

// Verify driverArgs.master is a valid url so that we can use it in ClientActor safely
Master.toAkkaUrl(driverArgs.master, AkkaUtils.protocol(actorSystem))
for (m <- driverArgs.masters) {
Master.toAkkaUrl(m, AkkaUtils.protocol(actorSystem))
}
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))

actorSystem.awaitTermination()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[deploy] class ClientArguments(args: Array[String]) {
var logLevel = Level.WARN

// launch parameters
var master: String = ""
var masters: Array[String] = null
var jarUrl: String = ""
var mainClass: String = ""
var supervise: Boolean = DEFAULT_SUPERVISE
Expand Down Expand Up @@ -80,13 +80,13 @@ private[deploy] class ClientArguments(args: Array[String]) {
}

jarUrl = _jarUrl
master = _master
masters = _master.stripPrefix("spark://").split(",").map("spark://" + _)
mainClass = _mainClass
_driverOptions ++= tail

case "kill" :: _master :: _driverId :: tail =>
cmd = "kill"
master = _master
masters = _master.stripPrefix("spark://").split(",").map("spark://" + _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put this in a private def?

driverId = _driverId

case _ =>
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,17 @@ object SparkSubmit {

/** Kill an existing submission using the REST protocol. Standalone cluster mode only. */
private def kill(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
.killSubmission(args.master, args.submissionToKill)
new StandaloneRestClient(args.master)
.killSubmission(args.submissionToKill)
}

/**
* Request the status of an existing submission using the REST protocol.
* Standalone cluster mode only.
*/
private def requestStatus(args: SparkSubmitArguments): Unit = {
new StandaloneRestClient()
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
new StandaloneRestClient(args.master)
.requestSubmissionStatus(args.submissionToRequestStatusFor)
}

/**
Expand Down
17 changes: 11 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,17 @@ private[master] class Master(
}

case RequestDriverStatus(driverId) => {
(drivers ++ completedDrivers).find(_.id == driverId) match {
case Some(driver) =>
sender ! DriverStatusResponse(found = true, Some(driver.state),
driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
case None =>
sender ! DriverStatusResponse(found = false, None, None, None, None)
if (state != RecoveryState.ALIVE) {
val msg = s"Can only request driver status in ALIVE state. Current state: $state."
sender ! DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg)))
} else {
(drivers ++ completedDrivers).find(_.id == driverId) match {
case Some(driver) =>
sender ! DriverStatusResponse(found = true, Some(driver.state),
driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
case None =>
sender ! DriverStatusResponse(found = false, None, None, None, None)
}
}
}

Expand Down
Loading