From f45a6732e30c7ae374089d5a63146a03b7a40671 Mon Sep 17 00:00:00 2001 From: Jonathan Taws Date: Fri, 24 Jun 2016 12:23:33 +0200 Subject: [PATCH 1/5] [SPARK-15917] Added support for number of executors for Standalone mode --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 4 ++-- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 2 +- .../org/apache/spark/launcher/SparkSubmitOptionParser.java | 6 ++++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 9feafc99ac07..2a4c70936e92 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -456,8 +456,6 @@ object SparkSubmit { // Yarn only OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), - OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, - sysProp = "spark.executor.instances"), OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"), OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"), OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"), @@ -469,6 +467,8 @@ object SparkSubmit { sysProp = "spark.executor.cores"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), + OptionAssigner(args.numExecutors, STANDALONE | YARN, ALL_DEPLOY_MODES, + sysProp = "spark.executor.instances"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, sysProp = "spark.cores.max"), OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 8382fbe9ddb8..23b75e235046 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -99,7 +99,7 @@ private[spark] class StandaloneSchedulerBackend( if (Utils.isDynamicAllocationEnabled(conf)) { Some(0) } else { - None + sc.conf.getOption("spark.executor.instances").map(_.toInt) } val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index 6767cc507964..a031cce3ce74 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -57,6 +57,7 @@ class SparkSubmitOptionParser { protected final String PY_FILES = "--py-files"; protected final String REPOSITORIES = "--repositories"; protected final String STATUS = "--status"; + protected final String EXECUTOR_CORES = "--executor-cores"; protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores"; // Options that do not take arguments. @@ -70,12 +71,13 @@ class SparkSubmitOptionParser { // YARN-only options. protected final String ARCHIVES = "--archives"; - protected final String EXECUTOR_CORES = "--executor-cores"; protected final String KEYTAB = "--keytab"; - protected final String NUM_EXECUTORS = "--num-executors"; protected final String PRINCIPAL = "--principal"; protected final String QUEUE = "--queue"; + // Standalone and YARN options. + protected final String NUM_EXECUTORS = "--num-executors"; + /** * This is the canonical list of spark-submit options. Each entry in the array contains the * different aliases for the same option; the first element of each entry is the "official" From d0b1a71cc1106413fdedcc1c658aa4830b1122f0 Mon Sep 17 00:00:00 2001 From: JonathanTaws Date: Tue, 4 Oct 2016 15:32:03 +0200 Subject: [PATCH 2/5] [SPARK-15917] Added warning message if requested number of executors can't be satisfied --- .../scala/org/apache/spark/deploy/master/Master.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f8aac3008cef..e0290596bef3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -637,6 +637,14 @@ private[deploy] class Master( } freeWorkers = freeWorkers.filter(canLaunchExecutor) } + + // Check to see if we managed to launch the requested number of executors + val numExecutorsLaunched = assignedExecutors.sum + if(numExecutorsLaunched != app.executorLimit) { + logWarning(s"Failed to launch the requested number of executors due to resource limits : " + + s"only $numExecutorsLaunched executors instead of ${app.executorLimit}") + } + assignedCores } From 0af7b10c42c73d8ee9a0e49e9b652946274d1bae Mon Sep 17 00:00:00 2001 From: JonathanTaws Date: Sun, 9 Oct 2016 12:43:29 +0200 Subject: [PATCH 3/5] Added check on number of workers to avoid displaying the same message multiple times --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index e0290596bef3..523acc7a024a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -640,7 +640,7 @@ private[deploy] class Master( // Check to see if we managed to launch the requested number of executors val numExecutorsLaunched = assignedExecutors.sum - if(numExecutorsLaunched != app.executorLimit) { + if(numUsable != 0 && numExecutorsLaunched != app.executorLimit) { logWarning(s"Failed to launch the requested number of executors due to resource limits : " + s"only $numExecutorsLaunched executors instead of ${app.executorLimit}") } From eed3ecd91e3c84c0e17c513e4b48b92f6b1532f0 Mon Sep 17 00:00:00 2001 From: JonathanTaws Date: Sun, 9 Oct 2016 14:30:57 +0200 Subject: [PATCH 4/5] Improved check on num executors warning message --- .../scala/org/apache/spark/deploy/master/Master.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 523acc7a024a..4b23c669549d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -638,11 +638,13 @@ private[deploy] class Master( freeWorkers = freeWorkers.filter(canLaunchExecutor) } + val numExecutorsScheduled = assignedExecutors.sum + val numExecutorsLaunched = app.executors.size // Check to see if we managed to launch the requested number of executors - val numExecutorsLaunched = assignedExecutors.sum - if(numUsable != 0 && numExecutorsLaunched != app.executorLimit) { + if(numUsable != 0 && numExecutorsLaunched != app.executorLimit && + numExecutorsScheduled != app.executorLimit) { logWarning(s"Failed to launch the requested number of executors due to resource limits : " + - s"only $numExecutorsLaunched executors instead of ${app.executorLimit}") + s"only $numExecutorsScheduled executors instead of ${app.executorLimit}") } assignedCores From bffedac0756c98861f44dfa0967ec8477c63c4cc Mon Sep 17 00:00:00 2001 From: JonathanTaws Date: Mon, 10 Oct 2016 22:54:56 +0200 Subject: [PATCH 5/5] Corrected style mistake on if statement --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 4b23c669549d..40e1a57049c5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -641,7 +641,7 @@ private[deploy] class Master( val numExecutorsScheduled = assignedExecutors.sum val numExecutorsLaunched = app.executors.size // Check to see if we managed to launch the requested number of executors - if(numUsable != 0 && numExecutorsLaunched != app.executorLimit && + if (numUsable != 0 && numExecutorsLaunched != app.executorLimit && numExecutorsScheduled != app.executorLimit) { logWarning(s"Failed to launch the requested number of executors due to resource limits : " + s"only $numExecutorsScheduled executors instead of ${app.executorLimit}")