diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index a4817b3cf770d..5d0f1dcc88097 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -156,7 +156,7 @@ private[python] object PythonHadoopUtil { * Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]] */ def mapToConf(map: java.util.Map[String, String]): Configuration = { - val conf = new Configuration() + val conf = new Configuration(false) map.asScala.foreach { case (k, v) => conf.set(k, v) } conf } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 5b80e149b38ac..6bcfcc3da070a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -333,7 +333,7 @@ private[spark] object PythonRDD extends Logging { valueConverterClass: String, confAsMap: java.util.HashMap[String, String], batchSize: Int): JavaRDD[Array[Byte]] = { - val conf = PythonHadoopUtil.mapToConf(confAsMap) + val conf = getMergedConf(confAsMap, sc.hadoopConfiguration()) val rdd = newAPIHadoopRDDFromClassNames[K, V, F](sc, None, inputFormatClass, keyClass, valueClass, conf) @@ -402,7 +402,7 @@ private[spark] object PythonRDD extends Logging { valueConverterClass: String, confAsMap: java.util.HashMap[String, String], batchSize: Int): JavaRDD[Array[Byte]] = { - val conf = PythonHadoopUtil.mapToConf(confAsMap) + val conf = getMergedConf(confAsMap, sc.hadoopConfiguration()) val rdd = hadoopRDDFromClassNames[K, V, F](sc, None, inputFormatClass, keyClass, valueClass, conf) @@ -618,7 +618,7 @@ private[spark] object PythonRDD extends Logging { keyConverterClass: String, valueConverterClass: String, useNewAPI: Boolean): Unit = { - val conf = PythonHadoopUtil.mapToConf(confAsMap) + val conf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration) val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized), keyConverterClass, valueConverterClass, new JavaToWritableConverter) if (useNewAPI) { diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala index e2ec50fb1f172..aae5fb002e1e8 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala @@ -17,16 +17,42 @@ package org.apache.spark.api.python -import java.io.{ByteArrayOutputStream, DataOutputStream} +import java.io.{ByteArrayOutputStream, DataOutputStream, File} import java.net.{InetAddress, Socket} import java.nio.charset.StandardCharsets +import java.util +import scala.annotation.tailrec +import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io.{LongWritable, Text} +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer} +import org.apache.spark.util.Utils + +class PythonRDDSuite extends SparkFunSuite with LocalSparkContext { -class PythonRDDSuite extends SparkFunSuite { + var tempDir: File = _ + + override def beforeAll(): Unit = { + super.beforeAll() + tempDir = Utils.createTempDir() + } + + override def afterAll(): Unit = { + try { + Utils.deleteRecursively(tempDir) + } finally { + super.afterAll() + } + } test("Writing large strings to the worker") { val input: List[String] = List("a"*100000) @@ -65,4 +91,59 @@ class PythonRDDSuite extends SparkFunSuite { throw new Exception("exception within handleConnection") } } + + test("mapToConf should not load defaults") { + val map = Map("key" -> "value") + val conf = PythonHadoopUtil.mapToConf(map.asJava) + assert(conf.size() === map.size) + assert(conf.get("key") === map("key")) + } + + test("SparkContext's hadoop configuration should be respected in PythonRDD") { + // hadoop conf with default configurations + val hadoopConf = new Configuration() + assert(hadoopConf.size() > 0) + val headEntry = hadoopConf.asScala.head + val (firstKey, firstValue) = (headEntry.getKey, headEntry.getValue) + + // passed to spark conf with a different value(prefixed by spark.) + val conf = new SparkConf().setAppName("test").setMaster("local") + conf.set("spark.hadoop." + firstKey, "spark." + firstValue) + + sc = new SparkContext(conf) + val outDir = new File(tempDir, "output").getAbsolutePath + // write output as HadoopRDD's input + sc.makeRDD(1 to 1000, 10).saveAsTextFile(outDir) + + val javaSparkContext = new JavaSparkContext(sc) + val confMap = new util.HashMap[String, String]() + // set input path in job conf + confMap.put(FileInputFormat.INPUT_DIR, outDir) + + val pythonRDD = PythonRDD.hadoopRDD( + javaSparkContext, + classOf[TextInputFormat].getCanonicalName, + classOf[LongWritable].getCanonicalName, + classOf[Text].getCanonicalName, + null, + null, + confMap, + 0 + ) + + @tailrec + def getRootRDD(rdd: RDD[_]): RDD[_] = { + rdd.dependencies match { + case Nil => rdd + case dependency :: _ => getRootRDD(dependency.rdd) + } + } + + // retrieve hadoopRDD as it's a root RDD + val hadoopRDD = getRootRDD(pythonRDD).asInstanceOf[HadoopRDD[_, _]] + val jobConf = hadoopRDD.getConf + // the jobConf passed to HadoopRDD should contain SparkContext's hadoop items rather the default + // configs in client's Configuration + assert(jobConf.get(firstKey) === "spark." + firstValue) + } }