@@ -84,6 +84,27 @@ class NewHadoopRDD[K, V](
8484
8585 @ transient protected val jobId = new JobID (jobTrackerId, id)
8686
87+ private val shouldCloneJobConf = sparkContext.conf.getBoolean(" spark.hadoop.cloneConf" , false )
88+
89+ protected def getConf : Configuration = {
90+ val conf : Configuration = confBroadcast.value.value
91+ if (shouldCloneJobConf) {
92+ // Hadoop Configuration objects are not thread-safe, which may lead to various problems if
93+ // one job modifies a configuration while another reads it (SPARK-2546, SPARK-10611). This
94+ // problem occurs somewhat rarely because most jobs treat the configuration as though it's
95+ // immutable. One solution, implemented here, is to clone the Configuration object.
96+ // Unfortunately, this clone can be very expensive. To avoid unexpected performance regressions
97+ // for workloads and Hadoop versions that do not suffer from these thread-safety issues, this
98+ // cloning is disabled by default.
99+ NewHadoopRDD .CONFIGURATION_INSTANTIATION_LOCK .synchronized {
100+ logDebug(" Cloning Hadoop Configuration" )
101+ new Configuration (conf)
102+ }
103+ } else {
104+ conf
105+ }
106+ }
107+
87108 override def getPartitions : Array [Partition ] = {
88109 val inputFormat = inputFormatClass.newInstance
89110 inputFormat match {
@@ -104,7 +125,7 @@ class NewHadoopRDD[K, V](
104125 val iter = new Iterator [(K , V )] {
105126 val split = theSplit.asInstanceOf [NewHadoopPartition ]
106127 logInfo(" Input split: " + split.serializableHadoopSplit)
107- val conf = confBroadcast.value.value
128+ val conf = getConf
108129
109130 val inputMetrics = context.taskMetrics
110131 .getInputMetricsForReadMethod(DataReadMethod .Hadoop )
@@ -230,11 +251,15 @@ class NewHadoopRDD[K, V](
230251 super .persist(storageLevel)
231252 }
232253
233-
234- def getConf : Configuration = confBroadcast.value.value
235254}
236255
237256private [spark] object NewHadoopRDD {
257+ /**
258+ * Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
259+ * Therefore, we synchronize on this lock before calling new Configuration().
260+ */
261+ val CONFIGURATION_INSTANTIATION_LOCK = new Object ()
262+
238263 /**
239264 * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD ]], but passes in an InputSplit to
240265 * the given function rather than the index of the partition.
@@ -268,12 +293,13 @@ private[spark] class WholeTextFileRDD(
268293
269294 override def getPartitions : Array [Partition ] = {
270295 val inputFormat = inputFormatClass.newInstance
296+ val conf = getConf
271297 inputFormat match {
272298 case configurable : Configurable =>
273- configurable.setConf(getConf )
299+ configurable.setConf(conf )
274300 case _ =>
275301 }
276- val jobContext = newJobContext(getConf , jobId)
302+ val jobContext = newJobContext(conf , jobId)
277303 inputFormat.setMinPartitions(jobContext, minPartitions)
278304 val rawSplits = inputFormat.getSplits(jobContext).toArray
279305 val result = new Array [Partition ](rawSplits.size)
0 commit comments