Skip to content

Commit 1553230

Browse files
committed
Address review comments.
1 parent c6c6b73 commit 1553230

22 files changed

+548
-700
lines changed

core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy
1919

20-
private[spark] case class DriverDescription(
20+
private[deploy] class DriverDescription(
2121
val jarUrl: String,
2222
val mem: Int,
2323
val cores: Int,

core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.json4s._
3232
import org.json4s.jackson.JsonMethods
3333

3434
import org.apache.spark.{Logging, SparkConf, SparkContext}
35-
import org.apache.spark.deploy.master.{RecoveryState}
35+
import org.apache.spark.deploy.master.RecoveryState
3636
import org.apache.spark.util.Utils
3737

3838
/**

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ object SparkSubmit {
117117
* Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
118118
*/
119119
private def kill(args: SparkSubmitArguments): Unit = {
120-
new RestClient()
120+
new RestSubmissionClient()
121121
.killSubmission(args.master, args.submissionToKill)
122122
}
123123

@@ -126,7 +126,7 @@ object SparkSubmit {
126126
* Standalone and Mesos cluster mode only.
127127
*/
128128
private def requestStatus(args: SparkSubmitArguments): Unit = {
129-
new RestClient()
129+
new RestSubmissionClient()
130130
.requestSubmissionStatus(args.master, args.submissionToRequestStatusFor)
131131
}
132132

core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy.master
1919

20-
private[spark] object DriverState extends Enumeration {
20+
private[deploy] object DriverState extends Enumeration {
2121

2222
type DriverState = Value
2323

core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala

Lines changed: 18 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -17,53 +17,53 @@
1717

1818
package org.apache.spark.deploy.mesos
1919

20-
import java.io.File
2120
import java.util.concurrent.CountDownLatch
2221

2322
import org.apache.spark
24-
import org.apache.spark.deploy.mesos.MesosClusterDispatcher.ClusterDispatcherArguments
2523
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
2624
import org.apache.spark.deploy.rest.mesos.MesosRestServer
2725
import org.apache.spark.scheduler.cluster.mesos._
28-
import org.apache.spark.util.{IntParam, SignalLogger, Utils}
26+
import org.apache.spark.util.SignalLogger
2927
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3028

3129
/*
32-
* A dispatcher that is responsible for managing and launching drivers, and is intended to
33-
* be used for Mesos cluster mode. The dispatcher is launched by the user in the cluster,
34-
* which it launches a [[MesosRestServer]] for listening for driver requests, and launches a
35-
* [[MesosClusterScheduler]] to launch these drivers in the Mesos cluster.
30+
* A dispatcher that is responsible for managing and launching drivers, and is intended to be
31+
* used for Mesos cluster mode. The dispatcher is a long-running process started by the user in
32+
* the cluster independently of Spark applications.
33+
* It contains a [[MesosRestServer]] that listens for requests to submit drivers and a
34+
* [[MesosClusterScheduler]] that processes these requests by negotiating with the Mesos master
35+
* for resources.
3636
*
3737
* A typical new driver lifecycle is the following:
38-
*
3938
* - Driver submitted via spark-submit talking to the [[MesosRestServer]]
4039
* - [[MesosRestServer]] queues the driver request to [[MesosClusterScheduler]]
4140
* - [[MesosClusterScheduler]] gets resource offers and launches the drivers that are in queue
4241
*
4342
* This dispatcher supports both Mesos fine-grain or coarse-grain mode as the mode is configurable
4443
* per driver launched.
4544
* This class is needed since Mesos doesn't manage frameworks, so the dispatcher acts as
46-
* a daemon to launch drivers as Mesos frameworks upon request.
45+
* a daemon to launch drivers as Mesos frameworks upon request. The dispatcher is also started and
46+
* stopped by sbin/start-mesos-dispatcher and sbin/stop-mesos-dispatcher respectively.
4747
*/
4848
private[mesos] class MesosClusterDispatcher(
49-
args: ClusterDispatcherArguments,
49+
args: MesosClusterDispatcherArguments,
5050
conf: SparkConf)
5151
extends Logging {
5252

53-
private def publicAddress(conf: SparkConf, host: String): String = {
53+
private def publicAddress(conf: SparkConf, defaultAddress: String): String = {
5454
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
55-
if (envVar != null) envVar else host
55+
if (envVar != null) envVar else defaultAddress
5656
}
5757

58-
private val recoveryMode = conf.get("spark.deploy.recoveryMode", "NONE").toUpperCase()
58+
private val recoveryMode = conf.get("spark.mesos.deploy.recoveryMode", "NONE").toUpperCase()
5959
logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
6060

6161
private val engineFactory = recoveryMode match {
6262
case "NONE" => new BlackHoleMesosClusterPersistenceEngineFactory
6363
case "ZOOKEEPER" => new ZookeeperMesosClusterPersistenceEngineFactory(conf)
6464
}
6565

66-
private val scheduler = new MesosClusterSchedulerDriver(engineFactory, conf)
66+
private val scheduler = new MesosClusterScheduler(engineFactory, conf)
6767

6868
private val server = new MesosRestServer(args.host, args.port, conf, scheduler)
6969
private val webUi = new MesosClusterUI(
@@ -74,7 +74,6 @@ private[mesos] class MesosClusterDispatcher(
7474
scheduler)
7575

7676
private val shutdownLatch = new CountDownLatch(1)
77-
private val sparkHome = new File(Option(conf.getenv("SPARK_HOME")).getOrElse("."))
7877

7978
def start(): Unit = {
8079
webUi.bind()
@@ -98,110 +97,26 @@ private[mesos] class MesosClusterDispatcher(
9897
private[mesos] object MesosClusterDispatcher extends spark.Logging {
9998
def main(args: Array[String]) {
10099
SignalLogger.register(log)
101-
102100
val conf = new SparkConf
103-
val dispatcherArgs = new ClusterDispatcherArguments(args, conf)
104-
101+
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
105102
conf.setMaster(dispatcherArgs.masterUrl)
106-
conf.setAppName("Spark Cluster")
107-
103+
conf.setAppName(dispatcherArgs.name)
108104
dispatcherArgs.zookeeperUrl.foreach { z =>
109-
conf.set("spark.deploy.recoveryMode", "ZOOKEEPER")
110-
conf.set("spark.deploy.zookeeper.url", z)
105+
conf.set("spark.mesos.deploy.recoveryMode", "ZOOKEEPER")
106+
conf.set("spark.mesos.deploy.zookeeper.url", z)
111107
}
112-
113108
val dispatcher = new MesosClusterDispatcher(
114109
dispatcherArgs,
115110
conf)
116-
117111
dispatcher.start()
118-
119112
val shutdownHook = new Thread() {
120113
override def run() {
121114
logInfo("Shutdown hook is shutting down dispatcher")
122115
dispatcher.stop()
123116
dispatcher.awaitShutdown()
124117
}
125118
}
126-
127119
Runtime.getRuntime.addShutdownHook(shutdownHook)
128-
129120
dispatcher.awaitShutdown()
130121
}
131-
132-
private class ClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
133-
var host = Utils.localHostName()
134-
var port = 7077
135-
var webUiPort = 8081
136-
var masterUrl: String = _
137-
var zookeeperUrl: Option[String] = None
138-
var propertiesFile: String = _
139-
140-
parse(args.toList)
141-
142-
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
143-
144-
private def parse(args: List[String]): Unit = args match {
145-
case ("--host" | "-h") :: value :: tail =>
146-
Utils.checkHost(value, "Please use hostname " + value)
147-
host = value
148-
parse(tail)
149-
150-
case ("--port" | "-p") :: IntParam(value) :: tail =>
151-
port = value
152-
parse(tail)
153-
154-
case ("--webui-port" | "-p") :: IntParam(value) :: tail =>
155-
webUiPort = value
156-
parse(tail)
157-
158-
case ("--zk" | "-z") :: value :: tail =>
159-
zookeeperUrl = Some(value)
160-
parse(tail)
161-
162-
case ("--master" | "-m") :: value :: tail =>
163-
if (!value.startsWith("mesos://")) {
164-
System.err.println("Cluster dispatcher only supports mesos (uri begins with mesos://)")
165-
System.exit(1)
166-
}
167-
masterUrl = value.stripPrefix("mesos://")
168-
parse(tail)
169-
170-
case ("--properties-file") :: value :: tail =>
171-
propertiesFile = value
172-
parse(tail)
173-
174-
case ("--help") :: tail =>
175-
printUsageAndExit(0)
176-
177-
case Nil => {
178-
if (masterUrl == null) {
179-
System.err.println("--master is required")
180-
System.exit(1)
181-
}
182-
}
183-
184-
case _ =>
185-
printUsageAndExit(1)
186-
}
187-
188-
/**
189-
* Print usage and exit JVM with the given exit code.
190-
*/
191-
def printUsageAndExit(exitCode: Int): Unit = {
192-
System.err.println(
193-
"Usage: MesosClusterDispatcher [options]\n" +
194-
"\n" +
195-
"Options:\n" +
196-
" -h HOST, --host HOST Hostname to listen on\n" +
197-
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
198-
" --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" +
199-
" -m --master MASTER URI for connecting to Mesos master\n" +
200-
" -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" +
201-
" Zookeeper for persistence\n" +
202-
" --properties-file FILE Path to a custom Spark properties file.\n" +
203-
" Default is conf/spark-defaults.conf.")
204-
System.exit(exitCode)
205-
}
206-
}
207122
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.mesos
19+
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.util.{IntParam, Utils}
22+
23+
24+
private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
25+
var host = Utils.localHostName()
26+
var port = 7077
27+
var name = "Spark Cluster"
28+
var webUiPort = 8081
29+
var masterUrl: String = _
30+
var zookeeperUrl: Option[String] = None
31+
var propertiesFile: String = _
32+
33+
parse(args.toList)
34+
35+
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
36+
37+
private def parse(args: List[String]): Unit = args match {
38+
case ("--host" | "-h") :: value :: tail =>
39+
Utils.checkHost(value, "Please use hostname " + value)
40+
host = value
41+
parse(tail)
42+
43+
case ("--port" | "-p") :: IntParam(value) :: tail =>
44+
port = value
45+
parse(tail)
46+
47+
case ("--webui-port" | "-p") :: IntParam(value) :: tail =>
48+
webUiPort = value
49+
parse(tail)
50+
51+
case ("--zk" | "-z") :: value :: tail =>
52+
zookeeperUrl = Some(value)
53+
parse(tail)
54+
55+
case ("--master" | "-m") :: value :: tail =>
56+
if (!value.startsWith("mesos://")) {
57+
System.err.println("Cluster dispatcher only supports mesos (uri begins with mesos://)")
58+
System.exit(1)
59+
}
60+
masterUrl = value.stripPrefix("mesos://")
61+
parse(tail)
62+
63+
case ("--name") :: value :: tail =>
64+
name = value
65+
parse(tail)
66+
67+
case ("--properties-file") :: value :: tail =>
68+
propertiesFile = value
69+
parse(tail)
70+
71+
case ("--help") :: tail =>
72+
printUsageAndExit(0)
73+
74+
case Nil => {
75+
if (masterUrl == null) {
76+
System.err.println("--master is required")
77+
System.exit(1)
78+
}
79+
}
80+
81+
case _ =>
82+
printUsageAndExit(1)
83+
}
84+
85+
private def printUsageAndExit(exitCode: Int): Unit = {
86+
System.err.println(
87+
"Usage: MesosClusterDispatcher [options]\n" +
88+
"\n" +
89+
"Options:\n" +
90+
" -h HOST, --host HOST Hostname to listen on\n" +
91+
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
92+
" --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" +
93+
" --name NAME Framework name to show in Mesos UI\n" +
94+
" -m --master MASTER URI for connecting to Mesos master\n" +
95+
" -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" +
96+
" Zookeeper for persistence\n" +
97+
" --properties-file FILE Path to a custom Spark properties file.\n" +
98+
" Default is conf/spark-defaults.conf.")
99+
System.exit(exitCode)
100+
}
101+
}

core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.mesos
2020
import java.util.Date
2121

2222
import org.apache.spark.deploy.Command
23+
import org.apache.spark.scheduler.cluster.mesos.RetryState
2324

2425
/**
2526
* Describes a Spark driver that is submitted from the
@@ -32,18 +33,31 @@ import org.apache.spark.deploy.Command
3233
* @param command The command to launch the driver.
3334
* @param schedulerProperties Extra properties to pass the Mesos scheduler
3435
*/
35-
private[spark] case class MesosDriverDescription(
36+
private[spark] class MesosDriverDescription(
3637
val name: String,
3738
val jarUrl: String,
3839
val mem: Int,
3940
val cores: Double,
4041
val supervise: Boolean,
4142
val command: Command,
42-
val schedulerProperties: Map[String, String])
43+
val schedulerProperties: Map[String, String],
44+
val submissionId: String,
45+
val submissionDate: Date,
46+
val retryState: Option[RetryState] = None)
4347
extends Serializable {
44-
45-
var submissionId: Option[String] = None
46-
var submissionDate: Option[Date] = None
47-
48+
def copy(
49+
name: String = name,
50+
jarUrl: String = jarUrl,
51+
mem: Int = mem,
52+
cores: Double = cores,
53+
supervise: Boolean = supervise,
54+
command: Command = command,
55+
schedulerProperties: Map[String, String] = schedulerProperties,
56+
retryState: Option[RetryState] = retryState,
57+
submissionId: String = submissionId,
58+
submissionDate: Date = submissionDate): MesosDriverDescription = {
59+
new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, schedulerProperties,
60+
submissionId, submissionDate, retryState)
61+
}
4862
override def toString: String = s"MesosDriverDescription (${command.mainClass})"
4963
}

0 commit comments

Comments
 (0)