Skip to content

Commit 371ce65

Browse files
committed
Handle cluster mode recovery and state persistence.
1 parent 3d4dfa1 commit 371ce65

21 files changed

+755
-201
lines changed

core/src/main/scala/org/apache/spark/deploy/master/SparkCuratorUtil.scala renamed to core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.deploy.master
18+
package org.apache.spark.deploy
1919

2020
import scala.collection.JavaConversions._
2121

@@ -25,7 +25,7 @@ import org.apache.zookeeper.KeeperException
2525

2626
import org.apache.spark.{Logging, SparkConf}
2727

28-
private[deploy] object SparkCuratorUtil extends Logging {
28+
private[spark] object SparkCuratorUtil extends Logging {
2929

3030
private val ZK_CONNECTION_TIMEOUT_MILLIS = 15000
3131
private val ZK_SESSION_TIMEOUT_MILLIS = 60000

core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy.master
1919

20-
private[deploy] object DriverState extends Enumeration {
20+
private[spark] object DriverState extends Enumeration {
2121

2222
type DriverState = Value
2323

core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.{Logging, SparkConf}
2323
import org.apache.spark.deploy.master.MasterMessages._
2424
import org.apache.curator.framework.CuratorFramework
2525
import org.apache.curator.framework.recipes.leader.{LeaderLatchListener, LeaderLatch}
26+
import org.apache.spark.deploy.SparkCuratorUtil
2627

2728
private[master] class ZooKeeperLeaderElectionAgent(val masterActor: LeaderElectable,
2829
conf: SparkConf) extends LeaderLatchListener with LeaderElectionAgent with Logging {

core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.curator.framework.CuratorFramework
2626
import org.apache.zookeeper.CreateMode
2727

2828
import org.apache.spark.{Logging, SparkConf}
29+
import org.apache.spark.deploy.SparkCuratorUtil
2930

3031

3132
private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serialization: Serialization)

core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala

Lines changed: 75 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,16 @@
1818
package org.apache.spark.deploy.mesos
1919

2020
import org.apache.spark.{Logging, SecurityManager, SparkConf}
21-
import org.apache.spark.util.{IntParam, Utils}
21+
import org.apache.spark.util.{SignalLogger, IntParam, Utils}
2222

2323
import java.io.File
24-
import java.util.concurrent.CountDownLatch
24+
import java.util.concurrent.{TimeUnit, CountDownLatch}
2525

2626
import org.apache.spark.deploy.mesos.ui.MesosClusterUI
2727
import org.apache.spark.deploy.rest.MesosRestServer
28-
import org.apache.spark.scheduler.cluster.mesos.{ClusterScheduler, MesosClusterScheduler}
29-
28+
import org.apache.spark.scheduler.cluster.mesos._
29+
import org.apache.spark.deploy.mesos.MesosClusterDispatcher.ClusterDispatcherArguments
30+
import org.apache.mesos.state.{ZooKeeperState, InMemoryState}
3031

3132
/*
3233
* A dispatcher actor that is responsible for managing drivers, that is intended to
@@ -35,76 +36,101 @@ import org.apache.spark.scheduler.cluster.mesos.{ClusterScheduler, MesosClusterS
3536
* a daemon to launch drivers as Mesos frameworks upon request.
3637
*/
3738
private [spark] class MesosClusterDispatcher(
38-
host: String,
39-
serverPort: Int,
40-
conf: SparkConf,
41-
webUi: MesosClusterUI,
42-
scheduler: ClusterScheduler) extends Logging {
39+
args: ClusterDispatcherArguments,
40+
conf: SparkConf) extends Logging {
41+
42+
def dispatcherPublicAddress(conf: SparkConf, host: String): String = {
43+
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
44+
if (envVar != null) envVar else host
45+
}
46+
47+
val recoveryMode = conf.get("spark.deploy.recoveryMode", "NONE").toUpperCase()
48+
logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
49+
50+
val engineFactory = recoveryMode match {
51+
case "NONE" => new BlackHolePersistenceEngineFactory
52+
case "ZOOKEEPER" => {
53+
new ZookeeperClusterPersistenceEngineFactory(conf)
54+
}
55+
}
4356

44-
val server = new MesosRestServer(host, serverPort, conf, scheduler)
57+
val scheduler = new MesosClusterScheduler(
58+
engineFactory,
59+
conf)
60+
61+
val server = new MesosRestServer(args.host, args.port, conf, scheduler)
62+
63+
val webUi = new MesosClusterUI(
64+
new SecurityManager(conf),
65+
args.webUiPort,
66+
conf,
67+
dispatcherPublicAddress(conf, args.host),
68+
scheduler)
69+
70+
val shutdownLatch = new CountDownLatch(1)
4571

4672
val sparkHome =
4773
new File(sys.env.get("SPARK_HOME").getOrElse("."))
4874

4975
def start() {
76+
webUi.bind()
77+
scheduler.frameworkUrl = webUi.activeWebUiUrl
78+
scheduler.start()
5079
server.start()
51-
// We assume web ui is already started as the scheduler needs the bound port.
80+
}
81+
82+
def awaitShutdown() {
83+
shutdownLatch.await()
5284
}
5385

5486
def stop() {
5587
webUi.stop()
5688
server.stop()
89+
scheduler.stop()
90+
shutdownLatch.countDown()
5791
}
5892
}
5993

60-
object MesosClusterDispatcher {
61-
def dispatcherPublicAddress(conf: SparkConf, host: String): String = {
62-
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
63-
if (envVar != null) envVar else host
64-
}
65-
66-
val shutdownLatch = new CountDownLatch(1)
67-
94+
private[mesos] object MesosClusterDispatcher extends Logging {
6895
def main(args: Array[String]) {
96+
SignalLogger.register(log)
97+
6998
val conf = new SparkConf
7099
val dispatcherArgs = new ClusterDispatcherArguments(args, conf)
100+
71101
conf.setMaster(dispatcherArgs.masterUrl)
72102
conf.setAppName("Mesos Cluster Dispatcher")
73-
val scheduler = new MesosClusterScheduler(conf)
74-
75-
// We have to create the webui and bind it early as we need to
76-
// pass the framework web ui url to Mesos which is before the
77-
// scheduler starts.
78-
val webUi =
79-
new MesosClusterUI(
80-
new SecurityManager(conf),
81-
dispatcherArgs.webUiPort,
82-
conf,
83-
dispatcherPublicAddress(conf, dispatcherArgs.host),
84-
scheduler)
85103

86-
webUi.bind()
104+
val dispatcher = new MesosClusterDispatcher(
105+
dispatcherArgs,
106+
conf)
87107

88-
scheduler.frameworkUrl = webUi.activeWebUiUrl
89-
scheduler.start()
90-
new MesosClusterDispatcher(
91-
dispatcherArgs.host,
92-
dispatcherArgs.port,
93-
conf,
94-
webUi,
95-
scheduler).start()
108+
dispatcher.start()
96109

97-
shutdownLatch.await()
110+
val shutdownHook = new Thread() {
111+
override def run() {
112+
logInfo("Shutdown hook is shutting down dispatcher")
113+
dispatcher.stop()
114+
dispatcher.awaitShutdown()
115+
}
116+
}
117+
118+
Runtime.getRuntime.addShutdownHook(shutdownHook)
119+
120+
dispatcher.awaitShutdown()
98121
}
99122

100123
class ClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
101124
var host = Utils.localHostName()
102125
var port = 7077
103126
var webUiPort = 8081
104-
var masterUrl: String = null
127+
var masterUrl: String = _
128+
var propertiesFile: String = _
105129

106130
parse(args.toList)
107131

132+
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
133+
108134
def parse(args: List[String]): Unit = args match {
109135
case ("--host" | "-h") :: value :: tail =>
110136
Utils.checkHost(value, "Please use hostname " + value)
@@ -127,6 +153,10 @@ object MesosClusterDispatcher {
127153
masterUrl = value.stripPrefix("mesos://")
128154
parse(tail)
129155

156+
case ("--properties-file") :: value :: tail =>
157+
propertiesFile = value
158+
parse(tail)
159+
130160
case ("--help") :: tail =>
131161
printUsageAndExit(0)
132162

@@ -152,7 +182,9 @@ object MesosClusterDispatcher {
152182
" -h HOST, --host HOST Hostname to listen on\n" +
153183
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
154184
" --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" +
155-
" -m --master MASTER URI for connecting to Mesos master\n")
185+
" -m --master MASTER URI for connecting to Mesos master\n" +
186+
" --properties-file FILE Path to a custom Spark properties file.\n" +
187+
" Default is conf/spark-defaults.conf.")
156188
System.exit(exitCode)
157189
}
158190
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy.mesos
19+
20+
import org.apache.spark.deploy.DriverDescription
21+
import scala.collection.mutable
22+
23+
private[spark] class MesosDriverDescription(
24+
val desc: DriverDescription,
25+
val schedulerProperties: mutable.HashMap[String, String]) extends Serializable {
26+
}

core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverOutputPage.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ class DriverOutputPage(parent: MesosClusterUI) extends WebUIPage("") {
3030

3131
val queuedHeaders = Seq("DriverID", "Submit Date", "Description")
3232
val driverHeaders = queuedHeaders ++
33-
Seq("Start Date", "Mesos Slave ID", "Mesos Task ID", "State")
33+
Seq("Start Date", "Mesos Slave ID", "State")
3434

3535
val queuedTable = UIUtils.listingTable(queuedHeaders, queuedRow, state.queuedDrivers)
3636
val launchedTable = UIUtils.listingTable(driverHeaders, driverRow, state.launchedDrivers)
3737
val finishedTable = UIUtils.listingTable(driverHeaders, driverRow, state.finishedDrivers)
3838
val content =
39+
<p>Mesos Framework ID: {state.appId}</p>
3940
<div class="row-fluid">
4041
<div class="span12">
4142
<h4>Queued Drivers:</h4>
@@ -53,18 +54,17 @@ class DriverOutputPage(parent: MesosClusterUI) extends WebUIPage("") {
5354
<tr>
5455
<td>{submission.submissionId}</td>
5556
<td>{submission.submitDate}</td>
56-
<td>{submission.req.desc.command.mainClass}</td>
57+
<td>{submission.desc.desc.command.mainClass}</td>
5758
</tr>
5859
}
5960

6061
def driverRow(state: ClusterTaskState): Seq[Node] = {
6162
<tr>
6263
<td>{state.submission.submissionId}</td>
6364
<td>{state.submission.submitDate}</td>
64-
<td>{state.submission.req.desc.command.mainClass}</td>
65+
<td>{state.submission.desc.desc.command.mainClass}</td>
6566
<td>{state.startDate}</td>
6667
<td>{state.slaveId.getValue}</td>
67-
<td>{state.taskId.getValue}</td>
6868
<td>{stateString(state.taskState)}</td>
6969
</tr>
7070
}

core/src/main/scala/org/apache/spark/deploy/mesos/ui/MesosClusterUI.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import org.apache.spark.ui.{SparkUI, WebUI}
2121
import org.apache.spark.SparkConf
2222
import org.apache.spark.SecurityManager
2323
import org.apache.spark.ui.JettyUtils._
24-
import org.apache.spark.deploy.worker.ui.ActiveWebUiUrlAccessor
2524
import org.apache.spark.scheduler.cluster.mesos.ClusterScheduler
2625

2726
/**
@@ -33,7 +32,7 @@ private [spark] class MesosClusterUI(
3332
conf: SparkConf,
3433
dispatcherPublicAddress: String,
3534
val scheduler: ClusterScheduler)
36-
extends WebUI(securityManager, port, conf) with ActiveWebUiUrlAccessor {
35+
extends WebUI(securityManager, port, conf) {
3736

3837
initialize()
3938

core/src/main/scala/org/apache/spark/deploy/rest/MesosRestServer.scala

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import org.apache.spark.deploy.Command
2626
import org.apache.spark.{SparkConf, SPARK_VERSION => sparkVersion}
2727
import org.apache.spark.util.Utils
2828
import org.apache.spark.scheduler.cluster.mesos.ClusterScheduler
29-
import org.apache.spark.scheduler.cluster.mesos.DriverRequest
29+
import scala.collection.mutable
30+
import org.apache.spark.deploy.mesos.MesosDriverDescription
3031

3132
/**
3233
* A server that responds to requests submitted by the [[RestClient]].
@@ -74,7 +75,7 @@ class MesosSubmitRequestServlet(
7475
* This does not currently consider fields used by python applications since python
7576
* is not supported in mesos cluster mode yet.
7677
*/
77-
private def buildDriverRequest(request: CreateSubmissionRequest): DriverRequest = {
78+
private def buildDriverDescription(request: CreateSubmissionRequest): MesosDriverDescription = {
7879
// Required fields, including the main class because python is not yet supported
7980
val appResource = Option(request.appResource).getOrElse {
8081
throw new SubmitRestMissingFieldException("Application jar is missing.")
@@ -93,6 +94,25 @@ class MesosSubmitRequestServlet(
9394
val driverCores = sparkProperties.get("spark.driver.cores")
9495
val appArgs = request.appArgs
9596
val environmentVariables = request.environmentVariables
97+
val schedulerProperties = new mutable.HashMap[String, String]
98+
// Store Spark submit specific arguments here to pass to the scheduler.
99+
schedulerProperties("spark.app.name") = sparkProperties.getOrElse("spark.app.name", mainClass)
100+
101+
sparkProperties.get("spark.executor.memory").foreach {
102+
v => schedulerProperties("spark.executor.memory") = v
103+
}
104+
105+
sparkProperties.get("spark.cores.max").foreach {
106+
v => schedulerProperties("spark.cores.max") = v
107+
}
108+
109+
sparkProperties.get("spark.executor.uri").foreach {
110+
v => schedulerProperties("spark.executor.uri") = v
111+
}
112+
113+
sparkProperties.get("spark.mesos.executor.home").foreach {
114+
v => schedulerProperties("spark.mesos.executor.home") = v
115+
}
96116

97117
// Construct driver description
98118
val conf = new SparkConf(false)
@@ -109,9 +129,10 @@ class MesosSubmitRequestServlet(
109129
val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY)
110130
val actualDriverCores = driverCores.map(_.toInt).getOrElse(DEFAULT_CORES)
111131

112-
DriverRequest(new DriverDescription(
113-
appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command),
114-
conf)
132+
val desc = new DriverDescription(
133+
appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command)
134+
135+
new MesosDriverDescription(desc, schedulerProperties)
115136
}
116137

117138
protected override def handleSubmit(
@@ -120,7 +141,7 @@ class MesosSubmitRequestServlet(
120141
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
121142
requestMessage match {
122143
case submitRequest: CreateSubmissionRequest =>
123-
val driverDescription = buildDriverRequest(submitRequest)
144+
val driverDescription = buildDriverDescription(submitRequest)
124145
val response = scheduler.submitDriver(driverDescription)
125146
val submitResponse = new CreateSubmissionResponse
126147
submitResponse.serverSparkVersion = sparkVersion

core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ private[deploy] class DriverRunner(
4444
val sparkHome: File,
4545
val driverDesc: DriverDescription,
4646
val worker: ActorRef,
47-
val workerUrl: String,
48-
val securityManager: SecurityManager)
47+
val workerUrl: String)
4948
extends Logging {
5049

5150
@volatile private var process: Option[Process] = None
@@ -137,9 +136,12 @@ private[deploy] class DriverRunner(
137136
* Will throw an exception if there are errors downloading the jar.
138137
*/
139138
private def downloadUserJar(driverDir: File): String = {
139+
140140
val jarPath = new Path(driverDesc.jarUrl)
141141

142142
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
143+
val jarFileSystem = jarPath.getFileSystem(hadoopConf)
144+
143145
val destPath = new File(driverDir.getAbsolutePath, jarPath.getName)
144146
val jarFileName = jarPath.getName
145147
val localJarFile = new File(driverDir, jarFileName)

0 commit comments

Comments
 (0)