diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6d7f05d21710..dbbed51a0e91 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -35,7 +35,7 @@ import scala.util.control.NonFatal import com.google.common.collect.MapMaker import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, @@ -961,6 +961,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() + + // This is a hack to enforce loading hdfs-site.xml. + // See SPARK-11227 for details. + FileSystem.getLocal(conf) + // Add necessary security credentials to the JobConf before broadcasting it. SparkHadoopUtil.get.addCredentials(conf) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions) @@ -981,6 +986,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() + + // This is a hack to enforce loading hdfs-site.xml. + // See SPARK-11227 for details. + FileSystem.get(new URI(path), hadoopConfiguration) + // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) @@ -1065,6 +1075,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope { assertNotStopped() + + // This is a hack to enforce loading hdfs-site.xml. + // See SPARK-11227 for details. + FileSystem.get(new URI(path), hadoopConfiguration) + // The call to NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves val job = NewHadoopJob.getInstance(conf) @@ -1099,6 +1114,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli kClass: Class[K], vClass: Class[V]): RDD[(K, V)] = withScope { assertNotStopped() + + // This is a hack to enforce loading hdfs-site.xml. + // See SPARK-11227 for details. + FileSystem.getLocal(conf) + // Add necessary security credentials to the JobConf. Required to access secure HDFS. val jconf = new JobConf(conf) SparkHadoopUtil.get.addCredentials(jconf)