Skip to content

Commit d66c01f

Browse files
mateizrxin
authored andcommitted
Merge pull request #19 from aarondav/master-zk
Standalone Scheduler fault tolerance using ZooKeeper This patch implements full distributed fault tolerance for standalone scheduler Masters. There is only one master Leader at a time, which is actively serving scheduling requests. If this Leader crashes, another master will eventually be elected, reconstruct the state from the first Master, and continue serving scheduling requests. Leader election is performed using the ZooKeeper leader election pattern. We try to minimize the use of ZooKeeper and the assumptions about ZooKeeper's behavior, so there is a layer of retries and session monitoring on top of the ZooKeeper client. Master failover follows directly from the single-node Master recovery via the file system (patch d5a96fe), save that the Master state is stored in ZooKeeper instead. Configuration: By default, no recovery mechanism is enabled (spark.deploy.recoveryMode = NONE). By setting spark.deploy.recoveryMode to ZOOKEEPER and setting spark.deploy.zookeeper.url to an appropriate ZooKeeper URL, ZooKeeper recovery mode is enabled. By setting spark.deploy.recoveryMode to FILESYSTEM and setting spark.deploy.recoveryDirectory to an appropriate directory accessible by the Master, we will keep the behavior of from d5a96fe. Additionally, places where a Master could be specificied by a spark:// url can now take comma-delimited lists to specify backup masters. Note that this is only used for registration of NEW Workers and application Clients. Once a Worker or Client has registered with the Master Leader, it is "in the system" and will never need to register again. (cherry picked from commit c71499b) Signed-off-by: Reynold Xin <[email protected]>
1 parent 0fcb234 commit d66c01f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1947
-175
lines changed

bin/stop-slaves.sh

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
# limitations under the License.
1818
#
1919

20-
# Starts the master on the machine this script is executed on.
21-
2220
bin=`dirname "$0"`
2321
bin=`cd "$bin"; pwd`
2422

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ class SparkContext(
160160
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
161161
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
162162
// Regular expression for connecting to Spark deploy clusters
163-
val SPARK_REGEX = """(spark://.*)""".r
163+
val SPARK_REGEX = """spark://(.*)""".r
164164
//Regular expression for connection to Mesos cluster
165165
val MESOS_REGEX = """(mesos://.*)""".r
166166

@@ -176,7 +176,8 @@ class SparkContext(
176176

177177
case SPARK_REGEX(sparkUrl) =>
178178
val scheduler = new ClusterScheduler(this)
179-
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
179+
val masterUrls = sparkUrl.split(",").map("spark://" + _)
180+
val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
180181
scheduler.initialize(backend)
181182
scheduler
182183

@@ -192,8 +193,8 @@ class SparkContext(
192193
val scheduler = new ClusterScheduler(this)
193194
val localCluster = new LocalSparkCluster(
194195
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
195-
val sparkUrl = localCluster.start()
196-
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
196+
val masterUrls = localCluster.start()
197+
val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
197198
scheduler.initialize(backend)
198199
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
199200
localCluster.stop()

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@ import scala.collection.immutable.List
2121

2222
import org.apache.spark.deploy.ExecutorState.ExecutorState
2323
import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
24+
import org.apache.spark.deploy.master.RecoveryState.MasterState
2425
import org.apache.spark.deploy.worker.ExecutorRunner
2526
import org.apache.spark.util.Utils
2627

2728

2829
private[deploy] sealed trait DeployMessage extends Serializable
2930

31+
/** Contains messages sent between Scheduler actor nodes. */
3032
private[deploy] object DeployMessages {
3133

3234
// Worker to Master
@@ -52,17 +54,20 @@ private[deploy] object DeployMessages {
5254
exitStatus: Option[Int])
5355
extends DeployMessage
5456

57+
case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription])
58+
5559
case class Heartbeat(workerId: String) extends DeployMessage
5660

5761
// Master to Worker
5862

59-
case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
63+
case class RegisteredWorker(masterUrl: String, masterWebUiUrl: String) extends DeployMessage
6064

6165
case class RegisterWorkerFailed(message: String) extends DeployMessage
6266

63-
case class KillExecutor(appId: String, execId: Int) extends DeployMessage
67+
case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage
6468

6569
case class LaunchExecutor(
70+
masterUrl: String,
6671
appId: String,
6772
execId: Int,
6873
appDesc: ApplicationDescription,
@@ -76,9 +81,11 @@ private[deploy] object DeployMessages {
7681
case class RegisterApplication(appDescription: ApplicationDescription)
7782
extends DeployMessage
7883

84+
case class MasterChangeAcknowledged(appId: String)
85+
7986
// Master to Client
8087

81-
case class RegisteredApplication(appId: String) extends DeployMessage
88+
case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage
8289

8390
// TODO(matei): replace hostPort with host
8491
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
@@ -94,14 +101,19 @@ private[deploy] object DeployMessages {
94101

95102
case object StopClient
96103

104+
// Master to Worker & Client
105+
106+
case class MasterChanged(masterUrl: String, masterWebUiUrl: String)
107+
97108
// MasterWebUI To Master
98109

99110
case object RequestMasterState
100111

101112
// Master to MasterWebUI
102113

103114
case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
104-
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
115+
activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
116+
status: MasterState) {
105117

106118
Utils.checkHost(host, "Required hostname")
107119
assert (port > 0)
@@ -123,12 +135,7 @@ private[deploy] object DeployMessages {
123135
assert (port > 0)
124136
}
125137

126-
// Actor System to Master
127-
128-
case object CheckForWorkerTimeOut
129-
130-
case object RequestWebUIPort
131-
132-
case class WebUIPortResponse(webUIBoundPort: Int)
138+
// Actor System to Worker
133139

140+
case object SendHeartbeat
134141
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.deploy
19+
20+
/**
21+
* Used to send state on-the-wire about Executors from Worker to Master.
22+
* This state is sufficient for the Master to reconstruct its internal data structures during
23+
* failover.
24+
*/
25+
private[spark] class ExecutorDescription(
26+
val appId: String,
27+
val execId: Int,
28+
val cores: Int,
29+
val state: ExecutorState.Value)
30+
extends Serializable {
31+
32+
override def toString: String =
33+
"ExecutorState(appId=%s, execId=%d, cores=%d, state=%s)".format(appId, execId, cores, state)
34+
}

0 commit comments

Comments
 (0)