Skip to content

Commit 8074208

Browse files
mingyukimJoshRosen
authored andcommitted
[SPARK-10611] Clone Configuration for each task for NewHadoopRDD
This patch attempts to fix the Hadoop Configuration thread safety issue for NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD. Author: Mingyu Kim <[email protected]> Closes #8763 from mingyukim/mkim/SPARK-10611.
1 parent 348d7c9 commit 8074208

File tree

2 files changed

+34
-8
lines changed

2 files changed

+34
-8
lines changed

core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ private[spark] class BinaryFileRDD[T](
3434

3535
override def getPartitions: Array[Partition] = {
3636
val inputFormat = inputFormatClass.newInstance
37+
val conf = getConf
3738
inputFormat match {
3839
case configurable: Configurable =>
39-
configurable.setConf(getConf)
40+
configurable.setConf(conf)
4041
case _ =>
4142
}
42-
val jobContext = newJobContext(getConf, jobId)
43+
val jobContext = newJobContext(conf, jobId)
4344
inputFormat.setMinPartitions(jobContext, minPartitions)
4445
val rawSplits = inputFormat.getSplits(jobContext).toArray
4546
val result = new Array[Partition](rawSplits.size)

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition(
4444
extends Partition {
4545

4646
val serializableHadoopSplit = new SerializableWritable(rawSplit)
47-
4847
override def hashCode(): Int = 41 * (41 + rddId) + index
4948
}
5049

@@ -84,6 +83,27 @@ class NewHadoopRDD[K, V](
8483

8584
@transient protected val jobId = new JobID(jobTrackerId, id)
8685

86+
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
87+
88+
def getConf: Configuration = {
89+
val conf: Configuration = confBroadcast.value.value
90+
if (shouldCloneJobConf) {
91+
// Hadoop Configuration objects are not thread-safe, which may lead to various problems if
92+
// one job modifies a configuration while another reads it (SPARK-2546, SPARK-10611). This
93+
// problem occurs somewhat rarely because most jobs treat the configuration as though it's
94+
// immutable. One solution, implemented here, is to clone the Configuration object.
95+
// Unfortunately, this clone can be very expensive. To avoid unexpected performance
96+
// regressions for workloads and Hadoop versions that do not suffer from these thread-safety
97+
// issues, this cloning is disabled by default.
98+
NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
99+
logDebug("Cloning Hadoop Configuration")
100+
new Configuration(conf)
101+
}
102+
} else {
103+
conf
104+
}
105+
}
106+
87107
override def getPartitions: Array[Partition] = {
88108
val inputFormat = inputFormatClass.newInstance
89109
inputFormat match {
@@ -104,7 +124,7 @@ class NewHadoopRDD[K, V](
104124
val iter = new Iterator[(K, V)] {
105125
val split = theSplit.asInstanceOf[NewHadoopPartition]
106126
logInfo("Input split: " + split.serializableHadoopSplit)
107-
val conf = confBroadcast.value.value
127+
val conf = getConf
108128

109129
val inputMetrics = context.taskMetrics
110130
.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
@@ -230,11 +250,15 @@ class NewHadoopRDD[K, V](
230250
super.persist(storageLevel)
231251
}
232252

233-
234-
def getConf: Configuration = confBroadcast.value.value
235253
}
236254

237255
private[spark] object NewHadoopRDD {
256+
/**
257+
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
258+
* Therefore, we synchronize on this lock before calling new Configuration().
259+
*/
260+
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
261+
238262
/**
239263
* Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
240264
* the given function rather than the index of the partition.
@@ -268,12 +292,13 @@ private[spark] class WholeTextFileRDD(
268292

269293
override def getPartitions: Array[Partition] = {
270294
val inputFormat = inputFormatClass.newInstance
295+
val conf = getConf
271296
inputFormat match {
272297
case configurable: Configurable =>
273-
configurable.setConf(getConf)
298+
configurable.setConf(conf)
274299
case _ =>
275300
}
276-
val jobContext = newJobContext(getConf, jobId)
301+
val jobContext = newJobContext(conf, jobId)
277302
inputFormat.setMinPartitions(jobContext, minPartitions)
278303
val rawSplits = inputFormat.getSplits(jobContext).toArray
279304
val result = new Array[Partition](rawSplits.size)

0 commit comments

Comments
 (0)