From d41bfe09f1a32d6fff3d084b8f5452f615397744 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sat, 18 Jun 2016 00:27:22 +0900 Subject: [PATCH 1/2] Force to load HdfsConfiguration --- .../scala/org/apache/spark/deploy/SparkHadoopUtil.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index bb1793d451df..8f4f69a5d14b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -421,6 +421,13 @@ object SparkHadoopUtil { val SPARK_YARN_CREDS_COUNTER_DELIM = "-" + // Just load HdfsConfiguration into the class loader to add + // hdfs-site.xml as a default configuration file otherwise + // some HDFS related configurations doesn't ship to Executors and + // it can cause UnknownHostException when NameNode HA is enabled. + // See SPARK-11227 for more details. + Utils.classForName("org.apache.hadoop.hdfs.HdfsConfiguration") + /** * Number of records to update input metrics when reading from HadoopRDDs. * From fca2b4a319e79e14f9c8975d337b47fdaeac8e5e Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 25 Jul 2016 19:03:02 +0900 Subject: [PATCH 2/2] Modified SparkContext to enforce loading hdfs-site.xml --- .../scala/org/apache/spark/SparkContext.scala | 22 ++++++++++++++++++- .../apache/spark/deploy/SparkHadoopUtil.scala | 7 ------ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 6d7f05d21710..dbbed51a0e91 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -35,7 +35,7 @@ import scala.util.control.NonFatal import com.google.common.collect.MapMaker import org.apache.commons.lang3.SerializationUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, @@ -961,6 +961,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() + + // This is a hack to enforce loading hdfs-site.xml. + // See SPARK-11227 for details. + FileSystem.getLocal(conf) + // Add necessary security credentials to the JobConf before broadcasting it. SparkHadoopUtil.get.addCredentials(conf) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions) @@ -981,6 +986,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() + + // This is a hack to enforce loading hdfs-site.xml. + // See SPARK-11227 for details. + FileSystem.get(new URI(path), hadoopConfiguration) + // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) @@ -1065,6 +1075,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope { assertNotStopped() + + // This is a hack to enforce loading hdfs-site.xml. + // See SPARK-11227 for details. + FileSystem.get(new URI(path), hadoopConfiguration) + // The call to NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves val job = NewHadoopJob.getInstance(conf) @@ -1099,6 +1114,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli kClass: Class[K], vClass: Class[V]): RDD[(K, V)] = withScope { assertNotStopped() + + // This is a hack to enforce loading hdfs-site.xml. + // See SPARK-11227 for details. + FileSystem.getLocal(conf) + // Add necessary security credentials to the JobConf. Required to access secure HDFS. val jconf = new JobConf(conf) SparkHadoopUtil.get.addCredentials(jconf) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 70d217ace922..90c71cc6cfab 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -421,13 +421,6 @@ object SparkHadoopUtil { val SPARK_YARN_CREDS_COUNTER_DELIM = "-" - // Just load HdfsConfiguration into the class loader to add - // hdfs-site.xml as a default configuration file otherwise - // some HDFS related configurations doesn't ship to Executors and - // it can cause UnknownHostException when NameNode HA is enabled. - // See SPARK-11227 for more details. - Utils.classForName("org.apache.hadoop.hdfs.HdfsConfiguration") - /** * Number of records to update input metrics when reading from HadoopRDDs. *