diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0678bdd02110..1023ecffe6a1 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1524,7 +1524,16 @@ object SparkContext extends Logging {
throw new SparkException("YARN mode not available ?", e)
}
}
- val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+ val backend = try {
+ val clazz =
+ Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
+ val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
+ cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
+ } catch {
+ case e: Exception => {
+ throw new SparkException("YARN mode not available ?", e)
+ }
+ }
scheduler.initialize(backend)
scheduler
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 6a6d8e609bc3..e41e0a984169 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend {
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
throw new UnsupportedOperationException
+ def isReady(): Boolean = true
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 17292b4c15b8..7cbb83a15626 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -145,6 +145,10 @@ private[spark] class TaskSchedulerImpl(
}
}
+ override def postStartHook() {
+ waitBackendReady()
+ }
+
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
@@ -431,6 +435,17 @@ private[spark] class TaskSchedulerImpl(
// By default, rack is unknown
def getRackForHost(value: String): Option[String] = None
+
+ private def waitBackendReady(): Unit = {
+ if (backend.isReady) {
+ return
+ }
+ while (!backend.isReady) {
+ synchronized {
+ this.wait(100)
+ }
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e47a060683a2..b8f3b7bb85cb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
+ var totalExpectedExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
private val timeout = AkkaUtils.askTimeout(conf)
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+ // Submit tasks only after (registered executors / total expected executors)
+ // is equal to at least this value, that is double between 0 and 1.
+ var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0)
+ if (minRegisteredRatio > 1) minRegisteredRatio = 1
+ // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds).
+ val maxRegisteredWaitingTime =
+ conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000)
+ val createTime = System.currentTimeMillis()
+ var ready = if (minRegisteredRatio <= 0) true else false
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
@@ -83,6 +93,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
executorAddress(executorId) = sender.path.address
addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores)
+ if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) {
+ ready = true
+ logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " +
+ executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() +
+ ", minRegisteredExecutorsRatio: " + minRegisteredRatio)
+ }
makeOffers()
}
@@ -244,6 +260,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
throw new SparkException("Error notifying standalone scheduler's driver actor", e)
}
}
+
+ override def isReady(): Boolean = {
+ if (ready) {
+ return true
+ }
+ if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
+ ready = true
+ logInfo("SchedulerBackend is ready for scheduling beginning after waiting " +
+ "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime)
+ return true
+ }
+ false
+ }
}
private[spark] object CoarseGrainedSchedulerBackend {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9c07b3f7b695..bf2dc88e2904 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int,
memory: Int) {
+ totalExpectedExecutors.addAndGet(1)
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
}
diff --git a/docs/configuration.md b/docs/configuration.md
index b84104cc7e65..3f4296a2aa94 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -699,6 +699,25 @@ Apart from these, the following properties are also available, and may be useful
(in milliseconds)
+
+
spark.scheduler.minRegisteredExecutorsRatio |
+ 0 |
+
+ The minimum ratio of registered executors (registered executors / total expected executors)
+ to wait for before scheduling begins. Specified as a double between 0 and 1.
+ Regardless of whether the minimum ratio of executors has been reached,
+ the maximum amount of time it will wait before scheduling begins is controlled by config
+ spark.scheduler.maxRegisteredExecutorsWaitingTime
+ |
+
+
+ spark.scheduler.maxRegisteredExecutorsWaitingTime |
+ 30000 |
+
+ Maximum amount of time to wait for executors to register before scheduling begins
+ (in milliseconds).
+ |
+
#### Security
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1cc9c33cd2d0..c0f0595c94a0 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -184,6 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
+ System.setProperty("spark.executor.instances", args.numExecutors.toString)
val mainMethod = Class.forName(
args.userClass,
false /* initialize */ ,
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
index 25cc9016b10a..4c383ab574ab 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala
@@ -26,7 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024
var executorCores = 1
- var numExecutors = 2
+ var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
parseArgs(args.toList)
@@ -93,3 +93,7 @@ class ApplicationMasterArguments(val args: Array[String]) {
System.exit(exitCode)
}
}
+
+object ApplicationMasterArguments {
+ val DEFAULT_NUMBER_EXECUTORS = 2
+}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 6b91e6b9eb89..15e8c21aa590 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -40,8 +40,10 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
override def postStartHook() {
+ super.postStartHook()
// The yarn application is running, but the executor might not yet ready
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+ // TODO It needn't after waitBackendReady
Thread.sleep(2000L)
logInfo("YarnClientClusterScheduler.postStartHook done")
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 039cf4f27611..0ad82a70d055 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -77,6 +77,7 @@ private[spark] class YarnClientSchedulerBackend(
logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
+ totalExpectedExecutors.set(args.numExecutors)
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 39cdd2e8a522..9ee53d797c8e 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -48,9 +48,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
override def postStartHook() {
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
+ super.postStartHook()
if (sparkContextInitialized){
ApplicationMaster.waitForInitialAllocations()
// Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+ // TODO It needn't after waitBackendReady
Thread.sleep(3000L)
}
logInfo("YarnClusterScheduler.postStartHook done")
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
new file mode 100644
index 000000000000..a04b08f43cc5
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark.SparkContext
+import org.apache.spark.deploy.yarn.ApplicationMasterArguments
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.IntParam
+
+private[spark] class YarnClusterSchedulerBackend(
+ scheduler: TaskSchedulerImpl,
+ sc: SparkContext)
+ extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+
+ override def start() {
+ super.start()
+ var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS
+ if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) {
+ numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors)
+ }
+ // System property can override environment variable.
+ numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors)
+ totalExpectedExecutors.set(numExecutors)
+ }
+}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 6244332f2373..797c7895847d 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
+ System.setProperty("spark.executor.instances", args.numExecutors.toString)
val mainMethod = Class.forName(
args.userClass,
false,