diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 9feafc99ac07..2a4c70936e92 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -456,8 +456,6 @@ object SparkSubmit { // Yarn only OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), - OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, - sysProp = "spark.executor.instances"), OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"), OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"), OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"), @@ -469,6 +467,8 @@ object SparkSubmit { sysProp = "spark.executor.cores"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), + OptionAssigner(args.numExecutors, STANDALONE | YARN, ALL_DEPLOY_MODES, + sysProp = "spark.executor.instances"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f8aac3008cef..40e1a57049c5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -637,6 +637,16 @@ private[deploy] class Master( } freeWorkers = freeWorkers.filter(canLaunchExecutor) } + + val numExecutorsScheduled = assignedExecutors.sum + val numExecutorsLaunched = app.executors.size + // Check to see if we managed to launch the requested number of executors + if (numUsable != 0 && numExecutorsLaunched != app.executorLimit && + numExecutorsScheduled != app.executorLimit) { + logWarning(s"Failed to launch the requested number of executors due to resource limits : " + + s"only $numExecutorsScheduled executors instead of ${app.executorLimit}") + } + assignedCores } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 8382fbe9ddb8..23b75e235046 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -99,7 +99,7 @@ private[spark] class StandaloneSchedulerBackend( if (Utils.isDynamicAllocationEnabled(conf)) { Some(0) } else { - None + sc.conf.getOption("spark.executor.instances").map(_.toInt) } val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 6767cc507964..a031cce3ce74 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -57,6 +57,7 @@ class SparkSubmitOptionParser { protected final String PY_FILES = "--py-files"; protected final String REPOSITORIES = "--repositories"; protected final String STATUS = "--status"; + protected final String EXECUTOR_CORES = "--executor-cores"; protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores"; // Options that do not take arguments. @@ -70,12 +71,13 @@ class SparkSubmitOptionParser { // YARN-only options. protected final String ARCHIVES = "--archives"; - protected final String EXECUTOR_CORES = "--executor-cores"; protected final String KEYTAB = "--keytab"; - protected final String NUM_EXECUTORS = "--num-executors"; protected final String PRINCIPAL = "--principal"; protected final String QUEUE = "--queue"; + // Standalone and YARN options. + protected final String NUM_EXECUTORS = "--num-executors"; + /** * This is the canonical list of spark-submit options. Each entry in the array contains the * different aliases for the same option; the first element of each entry is the "official"