Skip to content

Commit 6d8f1dd

Browse files
committed
[SPARK-2546] Clone JobConf for each task (branch-1.0 / 1.1 backport)
This patch attempts to fix SPARK-2546 in `branch-1.0` and `branch-1.1`. The underlying problem is that thread-safety issues in Hadoop Configuration objects may cause Spark tasks to get stuck in infinite loops. The approach taken here is to clone a new copy of the JobConf for each task rather than sharing a single copy between tasks. Note that there are still Configuration thread-safety issues that may affect the driver, but these seem much less likely to occur in practice and will be more complex to fix (see discussion on the SPARK-2546 ticket). This cloning is guarded by a new configuration option (`spark.hadoop.cloneConf`) and is disabled by default in order to avoid unexpected performance regressions for workloads that are unaffected by the Configuration thread-safety issues. Author: Josh Rosen <[email protected]> Closes #2684 from JoshRosen/jobconf-fix-backport and squashes the following commits: f14f259 [Josh Rosen] Add configuration option to control cloning of Hadoop JobConf. b562451 [Josh Rosen] Remove unused jobConfCacheKey field. dd25697 [Josh Rosen] [SPARK-2546] [1.0 / 1.1 backport] Clone JobConf for each task. (cherry picked from commit 2cd40db) Signed-off-by: Josh Rosen <[email protected]> Conflicts: docs/configuration.md
1 parent dc18167 commit 6d8f1dd

File tree

2 files changed

+47
-15
lines changed

2 files changed

+47
-15
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -125,27 +125,47 @@ class HadoopRDD[K, V](
125125
// used to build JobTracker ID
126126
private val createTime = new Date()
127127

128+
private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", "false").toBoolean
129+
128130
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
129131
protected def getJobConf(): JobConf = {
130132
val conf: Configuration = broadcastedConf.value.value
131-
if (conf.isInstanceOf[JobConf]) {
132-
// A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
133-
conf.asInstanceOf[JobConf]
134-
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
135-
// getJobConf() has been called previously, so there is already a local cache of the JobConf
136-
// needed by this RDD.
137-
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
138-
} else {
139-
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
140-
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
141-
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
142-
// Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456).
133+
if (shouldCloneJobConf) {
134+
// Hadoop Configuration objects are not thread-safe, which may lead to various problems if
135+
// one job modifies a configuration while another reads it (SPARK-2546). This problem occurs
136+
// somewhat rarely because most jobs treat the configuration as though it's immutable. One
137+
// solution, implemented here, is to clone the Configuration object. Unfortunately, this
138+
// clone can be very expensive. To avoid unexpected performance regressions for workloads and
139+
// Hadoop versions that do not suffer from these thread-safety issues, this cloning is
140+
// disabled by default.
143141
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
142+
logDebug("Cloning Hadoop Configuration")
144143
val newJobConf = new JobConf(conf)
145-
initLocalJobConfFuncOpt.map(f => f(newJobConf))
146-
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
144+
if (!conf.isInstanceOf[JobConf]) {
145+
initLocalJobConfFuncOpt.map(f => f(newJobConf))
146+
}
147147
newJobConf
148148
}
149+
} else {
150+
if (conf.isInstanceOf[JobConf]) {
151+
logDebug("Re-using user-broadcasted JobConf")
152+
conf.asInstanceOf[JobConf]
153+
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
154+
logDebug("Re-using cached JobConf")
155+
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
156+
} else {
157+
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
158+
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
159+
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
160+
// Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
161+
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
162+
logDebug("Creating new JobConf and caching it for later re-use")
163+
val newJobConf = new JobConf(conf)
164+
initLocalJobConfFuncOpt.map(f => f(newJobConf))
165+
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
166+
newJobConf
167+
}
168+
}
149169
}
150170
}
151171

@@ -231,7 +251,10 @@ class HadoopRDD[K, V](
231251
}
232252

233253
private[spark] object HadoopRDD {
234-
/** Constructing Configuration objects is not threadsafe, use this lock to serialize. */
254+
/**
255+
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
256+
* Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
257+
*/
235258
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
236259

237260
/**

docs/configuration.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,15 @@ Apart from these, the following properties are also available, and may be useful
495495
output directories. We recommend that users do not disable this except if trying to achieve compatibility with
496496
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
497497
</tr>
498+
<tr>
499+
<td><code>spark.hadoop.cloneConf</code></td>
500+
<td>false</td>
501+
<td>If set to true, clones a new Hadoop <code>Configuration</code> object for each task. This
502+
option should be enabled to work around <code>Configuration</code> thread-safety issues (see
503+
<a href="https://issues.apache.org/jira/browse/SPARK-2546">SPARK-2546</a> for more details).
504+
This is disabled by default in order to avoid unexpected performance regressions for jobs that
505+
are not affected by these issues.</td>
506+
</tr>
498507
</table>
499508

500509
#### Networking

0 commit comments

Comments
 (0)