Skip to content

Commit d8bcc8e

Browse files
committed
Add way to limit default # of cores used by applications on standalone mode
Also documents the spark.deploy.spreadOut option.
1 parent 15d9534 commit d8bcc8e

File tree

8 files changed

+60
-14
lines changed

8 files changed

+60
-14
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
6767

6868
/** Set JAR files to distribute to the cluster. */
6969
def setJars(jars: Seq[String]): SparkConf = {
70-
for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
70+
for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
7171
set("spark.jars", jars.filter(_ != null).mkString(","))
7272
}
7373

@@ -165,6 +165,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with
165165
getOption(key).map(_.toDouble).getOrElse(defaultValue)
166166
}
167167

168+
/** Get a parameter as a boolean, falling back to a default if not set */
169+
def getBoolean(key: String, defaultValue: Boolean): Boolean = {
170+
getOption(key).map(_.toBoolean).getOrElse(defaultValue)
171+
}
172+
168173
/** Get all executor environment variables set on this SparkConf */
169174
def getExecutorEnv: Seq[(String, String)] = {
170175
val prefix = "spark.executorEnv."

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ class SparkContext(
116116
throw new SparkException("An application must be set in your configuration")
117117
}
118118

119-
if (conf.get("spark.log-conf", "false").toBoolean) {
119+
if (conf.get("spark.logConf", "false").toBoolean) {
120120
logInfo("Spark configuration:\n" + conf.toDebugString)
121121
}
122122

core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ private[spark] class ApplicationInfo(
2828
val desc: ApplicationDescription,
2929
val submitDate: Date,
3030
val driver: ActorRef,
31-
val appUiUrl: String)
31+
val appUiUrl: String,
32+
defaultCores: Int)
3233
extends Serializable {
3334

3435
@transient var state: ApplicationState.Value = _
@@ -81,7 +82,9 @@ private[spark] class ApplicationInfo(
8182
}
8283
}
8384

84-
def coresLeft: Int = desc.maxCores - coresGranted
85+
private val myMaxCores = if (desc.maxCores == Int.MaxValue) defaultCores else desc.maxCores
86+
87+
def coresLeft: Int = myMaxCores - coresGranted
8588

8689
private var _retryCount = 0
8790

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
8888
// As a temporary workaround before better ways of configuring memory, we allow users to set
8989
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
9090
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
91-
val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean
91+
val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
92+
93+
// Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue)
94+
val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue)
9295

9396
override def preStart() {
9497
logInfo("Starting Spark master at " + masterUrl)
@@ -426,7 +429,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
426429
def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
427430
val now = System.currentTimeMillis()
428431
val date = new Date(now)
429-
new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
432+
new ApplicationInfo(
433+
now, newApplicationId(date), desc, date, driver, desc.appUiUrl, defaultCores)
430434
}
431435

432436
def registerApplication(app: ApplicationInfo): Unit = {

docs/configuration.md

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,14 @@ there are at least five properties that you will commonly want to control:
7777
</tr>
7878
<tr>
7979
<td>spark.cores.max</td>
80-
<td>(infinite)</td>
80+
<td>(not set)</td>
8181
<td>
8282
When running on a <a href="spark-standalone.html">standalone deploy cluster</a> or a
8383
<a href="running-on-mesos.html#mesos-run-modes">Mesos cluster in "coarse-grained"
8484
sharing mode</a>, the maximum amount of CPU cores to request for the application from
85-
across the cluster (not from each machine). The default will use all available cores
86-
offered by the cluster manager.
85+
across the cluster (not from each machine). If not set, the default will be
86+
<code>spark.deploy.defaultCores</code> on Spark's standalone cluster manager, or
87+
infinite (all available cores) on Mesos.
8788
</td>
8889
</tr>
8990
</table>
@@ -404,12 +405,36 @@ Apart from these, the following properties are also available, and may be useful
404405
</td>
405406
</tr>
406407
<tr>
407-
<td>spark.log-conf</td>
408+
<td>spark.logConf</td>
408409
<td>false</td>
409410
<td>
410411
Log the supplied SparkConf as INFO at start of spark context.
411412
</td>
412413
</tr>
414+
<tr>
415+
<td>spark.deploy.spreadOut</td>
416+
<td>true</td>
417+
<td>
418+
Whether the standalone cluster manager should spread applications out across nodes or try
419+
to consolidate them onto as few nodes as possible. Spreading out is usually better for
420+
data locality in HDFS, but consolidating is more efficient for compute-intensive workloads. <br/>
421+
<b>Note:</b> this setting needs to be configured in the cluster master, not in individual
422+
applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
423+
</td>
424+
</tr>
425+
<tr>
426+
<td>spark.deploy.defaultCores</td>
427+
<td>(infinite)</td>
428+
<td>
429+
Default number of cores to give to applications in Spark's standalone mode if they don't
430+
set <code>spark.cores.max</code>. If not set, applications always get all available
431+
cores unless they configure <code>spark.cores.max</code> themselves.
432+
Set this lower on a shared cluster to prevent users from grabbing
433+
the whole cluster by default. <br/>
434+
<b>Note:</b> this setting needs to be configured in the cluster master, not in individual
435+
applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
436+
</td>
437+
</tr>
413438
</table>
414439

415440
## Viewing Spark Properties

docs/css/bootstrap.min.css

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/job-scheduling.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ Resource allocation can be configured as follows, based on the cluster type:
3232

3333
* **Standalone mode:** By default, applications submitted to the standalone mode cluster will run in
3434
FIFO (first-in-first-out) order, and each application will try to use all available nodes. You can limit
35-
the number of nodes an application uses by setting the `spark.cores.max` configuration property in it. This
36-
will allow multiple users/applications to run concurrently. For example, you might launch a long-running
37-
server that uses 10 cores, and allow users to launch shells that use 20 cores each.
35+
the number of nodes an application uses by setting the `spark.cores.max` configuration property in it,
36+
or change the default for applications that don't set this setting through `spark.deploy.defaultCores`.
3837
Finally, in addition to controlling cores, each application's `spark.executor.memory` setting controls
3938
its memory use.
4039
* **Mesos:** To use static partitioning on Mesos, set the `spark.mesos.coarse` configuration property to `true`,

docs/spark-standalone.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,16 @@ val conf = new SparkConf()
167167
val sc = new SparkContext(conf)
168168
{% endhighlight %}
169169

170+
In addition, you can configure `spark.deploy.defaultCores` on the cluster master process to change the
171+
default for applications that don't set `spark.cores.max` to something less than infinite.
172+
Do this by adding the following to `conf/spark-env.sh`:
173+
174+
{% highlight bash %}
175+
export SPARK_JAVA_OPTS="-Dspark.deploy.defaultCores=<value>"
176+
{% endhighlight %}
177+
178+
This is useful on shared clusters where users might not have configured a maximum number of cores
179+
individually.
170180

171181
# Monitoring and Logging
172182

0 commit comments

Comments
 (0)