Skip to content

Commit fd5259d

Browse files
committed
Address review comments.
1 parent 1553230 commit fd5259d

22 files changed

+261
-381
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab
2222
import java.net.URL
2323
import java.security.PrivilegedExceptionAction
2424

25+
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
26+
2527
import org.apache.hadoop.fs.Path
2628
import org.apache.hadoop.security.UserGroupInformation
2729
import org.apache.ivy.Ivy
@@ -38,7 +40,6 @@ import org.apache.spark.SPARK_VERSION
3840
import org.apache.spark.deploy.rest._
3941
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
4042

41-
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
4243

4344
/**
4445
* Whether to submit, kill, or request the status of an application.
@@ -380,10 +381,6 @@ object SparkSubmit {
380381
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
381382
sysProp = "spark.driver.extraLibraryPath"),
382383

383-
// Standalone cluster only
384-
// Do not set CL arguments here because there are multiple possibilities for the main class
385-
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"),
386-
387384
// Yarn client only
388385
OptionAssigner(args.queue, YARN, CLIENT, sysProp = "spark.yarn.queue"),
389386
OptionAssigner(args.numExecutors, YARN, CLIENT, sysProp = "spark.executor.instances"),
@@ -418,7 +415,8 @@ object SparkSubmit {
418415
OptionAssigner(args.driverCores, STANDALONE | MESOS, CLUSTER,
419416
sysProp = "spark.driver.cores"),
420417
OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
421-
sysProp = "spark.driver.supervise")
418+
sysProp = "spark.driver.supervise"),
419+
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy")
422420
)
423421

424422
// In client mode, launch the application main class directly
@@ -457,7 +455,7 @@ object SparkSubmit {
457455
// All Spark parameters are expected to be passed to the client through system properties.
458456
if (args.isStandaloneCluster) {
459457
if (args.useRest) {
460-
childMainClass = "org.apache.spark.deploy.rest.RestClient"
458+
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
461459
childArgs += (args.primaryResource, args.mainClass)
462460
} else {
463461
// In legacy standalone cluster mode, use Client as a wrapper around the user class
@@ -503,7 +501,7 @@ object SparkSubmit {
503501

504502
if (isMesosCluster) {
505503
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
506-
childMainClass = "org.apache.spark.deploy.rest.RestClient"
504+
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
507505
childArgs += (args.primaryResource, args.mainClass)
508506
if (args.childArgs != null) {
509507
childArgs ++= args.childArgs

core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.mesos
1919

2020
import java.util.concurrent.CountDownLatch
2121

22-
import org.apache.spark
2322
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
2423
import org.apache.spark.deploy.rest.mesos.MesosRestServer
2524
import org.apache.spark.scheduler.cluster.mesos._
@@ -50,17 +49,14 @@ private[mesos] class MesosClusterDispatcher(
5049
conf: SparkConf)
5150
extends Logging {
5251

53-
private def publicAddress(conf: SparkConf, defaultAddress: String): String = {
54-
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
55-
if (envVar != null) envVar else defaultAddress
56-
}
57-
52+
private val publicAddress = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
5853
private val recoveryMode = conf.get("spark.mesos.deploy.recoveryMode", "NONE").toUpperCase()
5954
logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
6055

6156
private val engineFactory = recoveryMode match {
6257
case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory
6358
case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf)
59+
case _ => throw new IllegalArgumentException("Unsupported recovery mode: " + recoveryMode)
6460
}
6561

6662
private val scheduler = new MesosClusterScheduler(engineFactory, conf)
@@ -70,7 +66,7 @@ private[mesos] class MesosClusterDispatcher(
7066
new SecurityManager(conf),
7167
args.webUiPort,
7268
conf,
73-
publicAddress(conf, args.host),
69+
publicAddress,
7470
scheduler)
7571

7672
private val shutdownLatch = new CountDownLatch(1)
@@ -94,7 +90,7 @@ private[mesos] class MesosClusterDispatcher(
9490
}
9591
}
9692

97-
private[mesos] object MesosClusterDispatcher extends spark.Logging {
93+
private[mesos] object MesosClusterDispatcher extends Logging {
9894
def main(args: Array[String]) {
9995
SignalLogger.register(log)
10096
val conf = new SparkConf
@@ -105,9 +101,7 @@ private[mesos] object MesosClusterDispatcher extends spark.Logging {
105101
conf.set("spark.mesos.deploy.recoveryMode", "ZOOKEEPER")
106102
conf.set("spark.mesos.deploy.zookeeper.url", z)
107103
}
108-
val dispatcher = new MesosClusterDispatcher(
109-
dispatcherArgs,
110-
conf)
104+
val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
111105
dispatcher.start()
112106
val shutdownHook = new Thread() {
113107
override def run() {

core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcherArguments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
7474
case Nil => {
7575
if (masterUrl == null) {
7676
System.err.println("--master is required")
77-
System.exit(1)
77+
printUsageAndExit(1)
7878
}
7979
}
8080

core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@ package org.apache.spark.deploy.mesos
2020
import java.util.Date
2121

2222
import org.apache.spark.deploy.Command
23-
import org.apache.spark.scheduler.cluster.mesos.RetryState
23+
import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
2424

2525
/**
2626
* Describes a Spark driver that is submitted from the
2727
* [[org.apache.spark.deploy.rest.mesos.MesosRestServer]], to be launched by
2828
* [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
2929
* @param jarUrl URL to the application jar
3030
* @param mem Amount of memory for the driver
31-
* @param cores Amount of cores for the driver
31+
* @param cores Number of cores for the driver
3232
* @param supervise Supervise the driver for long running app
3333
* @param command The command to launch the driver.
3434
* @param schedulerProperties Extra properties to pass the Mesos scheduler
@@ -43,8 +43,9 @@ private[spark] class MesosDriverDescription(
4343
val schedulerProperties: Map[String, String],
4444
val submissionId: String,
4545
val submissionDate: Date,
46-
val retryState: Option[RetryState] = None)
46+
val retryState: Option[MesosClusterRetryState] = None)
4747
extends Serializable {
48+
4849
def copy(
4950
name: String = name,
5051
jarUrl: String = jarUrl,
@@ -53,11 +54,12 @@ private[spark] class MesosDriverDescription(
5354
supervise: Boolean = supervise,
5455
command: Command = command,
5556
schedulerProperties: Map[String, String] = schedulerProperties,
56-
retryState: Option[RetryState] = retryState,
5757
submissionId: String = submissionId,
58-
submissionDate: Date = submissionDate): MesosDriverDescription = {
58+
submissionDate: Date = submissionDate,
59+
retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = {
5960
new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, schedulerProperties,
6061
submissionId, submissionDate, retryState)
6162
}
63+
6264
override def toString: String = s"MesosDriverDescription (${command.mainClass})"
6365
}

core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterPage.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ package org.apache.spark.deploy.mesos.ui
1919

2020
import javax.servlet.http.HttpServletRequest
2121

22+
import scala.xml.Node
23+
2224
import org.apache.mesos.Protos.TaskStatus
2325
import org.apache.spark.deploy.mesos.MesosDriverDescription
24-
import org.apache.spark.scheduler.cluster.mesos.MesosClusterTaskState
26+
import org.apache.spark.scheduler.cluster.mesos.MesosClusterSubmissionState
2527
import org.apache.spark.ui.{UIUtils, WebUIPage}
2628

27-
import scala.xml.Node
28-
2929
private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage("") {
3030
def render(request: HttpServletRequest): Seq[Node] = {
31-
val state = parent.scheduler.getState()
31+
val state = parent.scheduler.getSchedulerState()
3232
val queuedHeaders = Seq("Driver ID", "Submit Date", "Main Class", "Driver Resources")
3333
val driverHeaders = queuedHeaders ++
3434
Seq("Start Date", "Mesos Slave ID", "State")
@@ -37,7 +37,7 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
3737
val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers)
3838
val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers)
3939
val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers)
40-
val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.retryList)
40+
val retryTable = UIUtils.listingTable(retryHeaders, retryRow, state.pendingRetryDrivers)
4141
val content =
4242
<p>Mesos Framework ID: {state.frameworkId}</p>
4343
<div class="row-fluid">
@@ -64,15 +64,15 @@ private[mesos] class MesosClusterPage(parent: MesosClusterUI) extends WebUIPage(
6464
</tr>
6565
}
6666

67-
private def driverRow(state: MesosClusterTaskState): Seq[Node] = {
67+
private def driverRow(state: MesosClusterSubmissionState): Seq[Node] = {
6868
<tr>
69-
<td>{state.submission.submissionId}</td>
70-
<td>{state.submission.submissionDate}</td>
71-
<td>{state.submission.command.mainClass}</td>
72-
<td>cpus: {state.submission.cores}, mem: {state.submission.mem}</td>
69+
<td>{state.driverDescription.submissionId}</td>
70+
<td>{state.driverDescription.submissionDate}</td>
71+
<td>{state.driverDescription.command.mainClass}</td>
72+
<td>cpus: {state.driverDescription.cores}, mem: {state.driverDescription.mem}</td>
7373
<td>{state.startDate}</td>
7474
<td>{state.slaveId.getValue}</td>
75-
<td>{stateString(state.taskState)}</td>
75+
<td>{stateString(state.mesosTaskStatus)}</td>
7676
</tr>
7777
}
7878

core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.ui.{SparkUI, WebUI}
2525
/**
2626
* UI that displays driver results from the [[org.apache.spark.deploy.mesos.MesosClusterDispatcher]]
2727
*/
28-
private [spark] class MesosClusterUI(
28+
private[spark] class MesosClusterUI(
2929
securityManager: SecurityManager,
3030
port: Int,
3131
conf: SparkConf,
@@ -43,6 +43,6 @@ private [spark] class MesosClusterUI(
4343
}
4444
}
4545

46-
private[spark] object MesosClusterUI {
46+
private object MesosClusterUI {
4747
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
4848
}

core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ private[spark] object RestSubmissionClient {
332332

333333
def main(args: Array[String]): Unit = {
334334
if (args.size < 2) {
335-
sys.error("Usage: RestClient [app resource] [main class] [app args*]")
335+
sys.error("Usage: RestSubmissionClient [app resource] [main class] [app args*]")
336336
sys.exit(1)
337337
}
338338
val appResource = args(0)

core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ import org.apache.spark.util.Utils
4747
* server error.
4848
*/
4949
private[spark] abstract class RestSubmissionServer(
50-
val host: String,
51-
val requestedPort: Int,
52-
val masterConf: SparkConf) extends Logging {
50+
val host: String,
51+
val requestedPort: Int,
52+
val masterConf: SparkConf) extends Logging {
5353
protected val submitRequestServlet: SubmitRequestServlet
5454
protected val killRequestServlet: KillRequestServlet
5555
protected val statusRequestServlet: StatusRequestServlet
@@ -190,8 +190,7 @@ private[rest] abstract class RestServlet extends HttpServlet with Logging {
190190
/**
191191
* A servlet for handling kill requests passed to the [[RestSubmissionServer]].
192192
*/
193-
private[rest] abstract class KillRequestServlet
194-
extends RestServlet {
193+
private[rest] abstract class KillRequestServlet extends RestServlet {
195194

196195
/**
197196
* If a submission ID is specified in the URL, have the Master kill the corresponding

core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,13 @@ private[deploy] class StandaloneRestServer(
5656
masterActor: ActorRef,
5757
masterUrl: String)
5858
extends RestSubmissionServer(host, requestedPort, masterConf) {
59-
val submitRequestServlet = new StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf)
60-
val killRequestServlet = new StandaloneKillRequestServlet(masterActor, masterConf)
61-
val statusRequestServlet = new StandaloneStatusRequestServlet(masterActor, masterConf)
59+
60+
protected override val submitRequestServlet =
61+
new StandaloneSubmitRequestServlet(masterActor, masterUrl, masterConf)
62+
protected override val killRequestServlet =
63+
new StandaloneKillRequestServlet(masterActor, masterConf)
64+
protected override val statusRequestServlet =
65+
new StandaloneStatusRequestServlet(masterActor, masterConf)
6266
}
6367

6468
/**

core/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,21 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
3636
* All requests are forwarded to
3737
* [[org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler]].
3838
* This is intended to be used in Mesos cluster mode only.
39-
* For more details about the RestServer Spark protocol and status codes please refer to
40-
* [[RestSubmissionServer]] javadocs.
39+
* For more details about the REST submission please refer to [[RestSubmissionServer]] javadocs.
4140
*/
4241
private[spark] class MesosRestServer(
4342
host: String,
4443
requestedPort: Int,
4544
masterConf: SparkConf,
4645
scheduler: MesosClusterScheduler)
4746
extends RestSubmissionServer(host, requestedPort, masterConf) {
48-
protected val submitRequestServlet = new MesosSubmitRequestServlet(scheduler, masterConf)
49-
protected val killRequestServlet = new MesosKillRequestServlet(scheduler, masterConf)
50-
protected val statusRequestServlet = new MesosStatusRequestServlet(scheduler, masterConf)
47+
48+
protected override val submitRequestServlet =
49+
new MesosSubmitRequestServlet(scheduler, masterConf)
50+
protected override val killRequestServlet =
51+
new MesosKillRequestServlet(scheduler, masterConf)
52+
protected override val statusRequestServlet =
53+
new MesosStatusRequestServlet(scheduler, masterConf)
5154
}
5255

5356
private[deploy] class MesosSubmitRequestServlet(
@@ -59,7 +62,7 @@ private[deploy] class MesosSubmitRequestServlet(
5962
private val DEFAULT_MEMORY = 512 // mb
6063
private val DEFAULT_CORES = 1.0
6164

62-
private val nextDriverNumber: AtomicLong = new AtomicLong(0)
65+
private val nextDriverNumber = new AtomicLong(0)
6366
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
6467
private def newDriverId(submitDate: Date): String = {
6568
"driver-%s-%04d".format(
@@ -93,9 +96,9 @@ private[deploy] class MesosSubmitRequestServlet(
9396
val appArgs = request.appArgs
9497
val environmentVariables = request.environmentVariables
9598
val name = request.sparkProperties.get("spark.app.name").getOrElse(mainClass)
99+
96100
// Construct driver description
97-
val conf = new SparkConf(false)
98-
.setAll(sparkProperties)
101+
val conf = new SparkConf(false).setAll(sparkProperties)
99102
val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
100103
val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
101104
val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
@@ -106,7 +109,6 @@ private[deploy] class MesosSubmitRequestServlet(
106109
val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE)
107110
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
108111
val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES)
109-
110112
val submitDate = new Date()
111113
val submissionId = newDriverId(submitDate)
112114

@@ -149,7 +151,7 @@ private[deploy] class MesosKillRequestServlet(scheduler: MesosClusterScheduler,
149151
private[deploy] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
150152
extends StatusRequestServlet {
151153
protected override def handleStatus(submissionId: String): SubmissionStatusResponse = {
152-
val d = scheduler.getStatus(submissionId)
154+
val d = scheduler.getDriverStatus(submissionId)
153155
d.serverSparkVersion = sparkVersion
154156
d
155157
}

0 commit comments

Comments
 (0)