Skip to content

Commit 15c03e1

Browse files
author
Andrew Or
committed
[SPARK-4140] Document dynamic allocation
Once the external shuffle service is also documented, the dynamic allocation section will link to it. Let me know if the whole dynamic allocation should be moved to its separate page; I personally think the organization might be cleaner that way. This patch builds on top of oza's work in #3689. aarondav pwendell Author: Andrew Or <[email protected]> Author: Tsuyoshi Ozawa <[email protected]> Closes #3731 from andrewor14/document-dynamic-allocation and squashes the following commits: 1281447 [Andrew Or] Address a few comments b9843f2 [Andrew Or] Document the configs as well 246fb44 [Andrew Or] Merge branch 'SPARK-4839' of github.com:oza/spark into document-dynamic-allocation 8c64004 [Andrew Or] Add documentation for dynamic allocation (without configs) 6827b56 [Tsuyoshi Ozawa] Fixing a documentation of spark.dynamicAllocation.enabled. 53cff58 [Tsuyoshi Ozawa] Adding a documentation about dynamic resource allocation.
1 parent 7cb3f54 commit 15c03e1

File tree

2 files changed

+169
-0
lines changed

2 files changed

+169
-0
lines changed

docs/configuration.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,67 @@ Apart from these, the following properties are also available, and may be useful
10081008
</tr>
10091009
</table>
10101010

1011+
#### Dynamic allocation
1012+
<table class="table">
1013+
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
1014+
<tr>
1015+
<td><code>spark.dynamicAllocation.enabled</code></td>
1016+
<td>false</td>
1017+
<td>
1018+
Whether to use dynamic resource allocation, which scales the number of executors registered
1019+
with this application up and down based on the workload. Note that this is currently only
1020+
available on YARN mode. For more detail, see the description
1021+
<a href="job-scheduling.html#dynamic-resource-allocation">here</a>.
1022+
<br><br>
1023+
This requires the following configurations to be set:
1024+
<code>spark.dynamicAllocation.minExecutors</code>,
1025+
<code>spark.dynamicAllocation.maxExecutors</code>, and
1026+
<code>spark.shuffle.service.enabled</code>
1027+
</td>
1028+
</tr>
1029+
<tr>
1030+
<td><code>spark.dynamicAllocation.minExecutors</code></td>
1031+
<td>(none)</td>
1032+
<td>
1033+
Lower bound for the number of executors if dynamic allocation is enabled (required).
1034+
</td>
1035+
</tr>
1036+
<tr>
1037+
<td><code>spark.dynamicAllocation.maxExecutors</code></td>
1038+
<td>(none)</td>
1039+
<td>
1040+
Upper bound for the number of executors if dynamic allocation is enabled (required).
1041+
</td>
1042+
</tr>
1043+
<tr>
1044+
<td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
1045+
<td>60</td>
1046+
<td>
1047+
If dynamic allocation is enabled and there have been pending tasks backlogged for more than
1048+
this duration (in seconds), new executors will be requested. For more detail, see this
1049+
<a href="job-scheduling.html#resource-allocation-policy">description</a>.
1050+
</td>
1051+
</tr>
1052+
<tr>
1053+
<td><code>spark.dynamicAllocation.sustainedSchedulerBacklogTimeout</code></td>
1054+
<td><code>schedulerBacklogTimeout</code></td>
1055+
<td>
1056+
Same as <code>spark.dynamicAllocation.schedulerBacklogTimeout</code>, but used only for
1057+
subsequent executor requests. For more detail, see this
1058+
<a href="job-scheduling.html#resource-allocation-policy">description</a>.
1059+
</td>
1060+
</tr>
1061+
<tr>
1062+
<td><code>spark.dynamicAllocation.executorIdleTimeout</code></td>
1063+
<td>600</td>
1064+
<td>
1065+
If dynamic allocation is enabled and an executor has been idle for more than this duration
1066+
(in seconds), the executor will be removed. For more detail, see this
1067+
<a href="job-scheduling.html#resource-allocation-policy">description</a>.
1068+
</td>
1069+
</tr>
1070+
</table>
1071+
10111072
#### Security
10121073
<table class="table">
10131074
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>

docs/job-scheduling.md

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,114 @@ the same RDDs. For example, the [Shark](http://shark.cs.berkeley.edu) JDBC serve
5656
queries. In future releases, in-memory storage systems such as [Tachyon](http://tachyon-project.org) will
5757
provide another approach to share RDDs.
5858

59+
## Dynamic Resource Allocation
60+
61+
Spark 1.2 introduces the ability to dynamically scale the set of cluster resources allocated to
62+
your application up and down based on the workload. This means that your application may give
63+
resources back to the cluster if they are no longer used and request them again later when there
64+
is demand. This feature is particularly useful if multiple applications share resources in your
65+
Spark cluster. If a subset of the resources allocated to an application becomes idle, it can be
66+
returned to the cluster's pool of resources and acquired by other applications. In Spark, dynamic
67+
resource allocation is performed on the granularity of the executor and can be enabled through
68+
`spark.dynamicAllocation.enabled`.
69+
70+
This feature is currently disabled by default and available only on [YARN](running-on-yarn.html).
71+
A future release will extend this to [standalone mode](spark-standalone.html) and
72+
[Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes). Note that although Spark on
73+
Mesos already has a similar notion of dynamic resource sharing in fine-grained mode, enabling
74+
dynamic allocation allows your Mesos application to take advantage of coarse-grained low-latency
75+
scheduling while sharing cluster resources efficiently.
76+
77+
### Configuration and Setup
78+
79+
All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace.
80+
To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and
81+
provide lower and upper bounds for the number of executors through
82+
`spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant
83+
configurations are described on the [configurations page](configuration.html#dynamic-allocation)
84+
and in the subsequent sections in detail.
85+
86+
Additionally, your application must use an external shuffle service. The purpose of the service is
87+
to preserve the shuffle files written by executors so the executors can be safely removed (more
88+
detail described [below](job-scheduling.html#graceful-decommission-of-executors)). To enable
89+
this service, set `spark.shuffle.service.enabled` to `true`. In YARN, this external shuffle service
90+
is implemented in `org.apache.spark.yarn.network.YarnShuffleService` that runs in each `NodeManager`
91+
in your cluster. To start this service, follow these steps:
92+
93+
1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a
94+
pre-packaged distribution.
95+
2. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under
96+
`$SPARK_HOME/network/yarn/target/scala-<version>` if you are building Spark yourself, and under
97+
`lib` if you are using a distribution.
98+
2. Add this jar to the classpath of all `NodeManager`s in your cluster.
99+
3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,
100+
then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
101+
`org.apache.spark.yarn.network.YarnShuffleService`. Additionally, set all relevant
102+
`spark.shuffle.service.*` [configurations](configuration.html).
103+
4. Restart all `NodeManager`s in your cluster.
104+
105+
### Resource Allocation Policy
106+
107+
At a high level, Spark should relinquish executors when they are no longer used and acquire
108+
executors when they are needed. Since there is no definitive way to predict whether an executor
109+
that is about to be removed will run a task in the near future, or whether a new executor that is
110+
about to be added will actually be idle, we need a set of heuristics to determine when to remove
111+
and request executors.
112+
113+
#### Request Policy
114+
115+
A Spark application with dynamic allocation enabled requests additional executors when it has
116+
pending tasks waiting to be scheduled. This condition necessarily implies that the existing set
117+
of executors is insufficient to simultaneously saturate all tasks that have been submitted but
118+
not yet finished.
119+
120+
Spark requests executors in rounds. The actual request is triggered when there have been pending
121+
tasks for `spark.dynamicAllocation.schedulerBacklogTimeout` seconds, and then triggered again
122+
every `spark.dynamicAllocation.sustainedSchedulerBacklogTimeout` seconds thereafter if the queue
123+
of pending tasks persists. Additionally, the number of executors requested in each round increases
124+
exponentially from the previous round. For instance, an application will add 1 executor in the
125+
first round, and then 2, 4, 8 and so on executors in the subsequent rounds.
126+
127+
The motivation for an exponential increase policy is twofold. First, an application should request
128+
executors cautiously in the beginning in case it turns out that only a few additional executors is
129+
sufficient. This echoes the justification for TCP slow start. Second, the application should be
130+
able to ramp up its resource usage in a timely manner in case it turns out that many executors are
131+
actually needed.
132+
133+
#### Remove Policy
134+
135+
The policy for removing executors is much simpler. A Spark application removes an executor when
136+
it has been idle for more than `spark.dynamicAllocation.executorIdleTimeout` seconds. Note that,
137+
under most circumstances, this condition is mutually exclusive with the request condition, in that
138+
an executor should not be idle if there are still pending tasks to be scheduled.
139+
140+
### Graceful Decommission of Executors
141+
142+
Before dynamic allocation, a Spark executor exits either on failure or when the associated
143+
application has also exited. In both scenarios, all state associated with the executor is no
144+
longer needed and can be safely discarded. With dynamic allocation, however, the application
145+
is still running when an executor is explicitly removed. If the application attempts to access
146+
state stored in or written by the executor, it will have to perform a recompute the state. Thus,
147+
Spark needs a mechanism to decommission an executor gracefully by preserving its state before
148+
removing it.
149+
150+
This requirement is especially important for shuffles. During a shuffle, the Spark executor first
151+
writes its own map outputs locally to disk, and then acts as the server for those files when other
152+
executors attempt to fetch them. In the event of stragglers, which are tasks that run for much
153+
longer than their peers, dynamic allocation may remove an executor before the shuffle completes,
154+
in which case the shuffle files written by that executor must be recomputed unnecessarily.
155+
156+
The solution for preserving shuffle files is to use an external shuffle service, also introduced
157+
in Spark 1.2. This service refers to a long-running process that runs on each node of your cluster
158+
independently of your Spark applications and their executors. If the service is enabled, Spark
159+
executors will fetch shuffle files from the service instead of from each other. This means any
160+
shuffle state written by an executor may continue to be served beyond the executor's lifetime.
161+
162+
In addition to writing shuffle files, executors also cache data either on disk or in memory.
163+
When an executor is removed, however, all cached data will no longer be accessible. There is
164+
currently not yet a solution for this in Spark 1.2. In future releases, the cached data may be
165+
preserved through an off-heap storage similar in spirit to how shuffle files are preserved through
166+
the external shuffle service.
59167

60168
# Scheduling Within an Application
61169

0 commit comments

Comments
 (0)