Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,6 @@ object SparkSubmit {

// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"),
Expand All @@ -469,6 +467,8 @@ object SparkSubmit {
sysProp = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.numExecutors, STANDALONE | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
sysProp = "spark.cores.max"),
OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, ALL_DEPLOY_MODES,
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,16 @@ private[deploy] class Master(
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}

val numExecutorsScheduled = assignedExecutors.sum
val numExecutorsLaunched = app.executors.size
// Check to see if we managed to launch the requested number of executors
if (numUsable != 0 && numExecutorsLaunched != app.executorLimit &&
numExecutorsScheduled != app.executorLimit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are numExecutorsLaunched and numExecutorsScheduled related to each other? Also here we probably want to do an inequality check just in case.

Also style: need space after if

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing is, how noisy is this? Do we log this if dynamic allocation is turned on (we shouldn't)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numExecutorsLaunched corresponds to the actual number of executors that have been launched so far (literally that have been registered in the executors list in the ApplicationInfo), whereas numExecutorsScheduled corresponds to the number of executors that have been scheduled/allocated by scheduleExecutorsOnWorkers. This is needed because scheduleExecutorsOnWorkers is called multiple times when setting up the executors, and if we don't check the condition we will log repeatedly the same message but with incorrect information (such as "0 executors launched" even though the executors have been launched previously).
Tell me if that doesn't make sense, I did a lot of trial and error until coming up with this condition.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the noise produced, it should be quite minimal. When it's not possible to launch the number of executors requested, just one warning is logged.
With dynamic allocation on, a message is logged when the initial number of executors is specified and it couldn't be satisfied. I don't think it's too much of a problem as there isn't any warning currently for that, but I can add a check to remove the warning when dynamic allocation is enabled if you prefer.

logWarning(s"Failed to launch the requested number of executors due to resource limits : " +
s"only $numExecutorsScheduled executors instead of ${app.executorLimit}")
}

assignedCores
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private[spark] class StandaloneSchedulerBackend(
if (Utils.isDynamicAllocationEnabled(conf)) {
Some(0)
} else {
None
sc.conf.getOption("spark.executor.instances").map(_.toInt)
}
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class SparkSubmitOptionParser {
protected final String PY_FILES = "--py-files";
protected final String REPOSITORIES = "--repositories";
protected final String STATUS = "--status";
protected final String EXECUTOR_CORES = "--executor-cores";
protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";

// Options that do not take arguments.
Expand All @@ -70,12 +71,13 @@ class SparkSubmitOptionParser {

// YARN-only options.
protected final String ARCHIVES = "--archives";
protected final String EXECUTOR_CORES = "--executor-cores";
protected final String KEYTAB = "--keytab";
protected final String NUM_EXECUTORS = "--num-executors";
protected final String PRINCIPAL = "--principal";
protected final String QUEUE = "--queue";

// Standalone and YARN options.
protected final String NUM_EXECUTORS = "--num-executors";

/**
* This is the canonical list of spark-submit options. Each entry in the array contains the
* different aliases for the same option; the first element of each entry is the "official"
Expand Down