Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private[python] object PythonHadoopUtil {
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
*/
def mapToConf(map: java.util.Map[String, String]): Configuration = {
val conf = new Configuration()
val conf = new Configuration(false)
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jul 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this doesn't break anything. Did you run the UT locally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Internally this is only called in PythonRDD and I have replaced all the invocations with merged SparkContext's hadoop conf. So it shouldn't break things in spark side. I ran the UTs of Scala side, haven't run python unit tests though.

map.asScala.foreach { case (k, v) => conf.set(k, v) }
conf
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ private[spark] object PythonRDD extends Logging {
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String],
batchSize: Int): JavaRDD[Array[Byte]] = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val conf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
Expand Down Expand Up @@ -402,7 +402,7 @@ private[spark] object PythonRDD extends Logging {
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String],
batchSize: Int): JavaRDD[Array[Byte]] = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val conf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
Expand Down Expand Up @@ -618,7 +618,7 @@ private[spark] object PythonRDD extends Logging {
keyConverterClass: String,
valueConverterClass: String,
useNewAPI: Boolean): Unit = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val conf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized),
keyConverterClass, valueConverterClass, new JavaToWritableConverter)
if (useNewAPI) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,42 @@

package org.apache.spark.api.python

import java.io.{ByteArrayOutputStream, DataOutputStream}
import java.io.{ByteArrayOutputStream, DataOutputStream, File}
import java.net.{InetAddress, Socket}
import java.nio.charset.StandardCharsets
import java.util

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer}
import org.apache.spark.util.Utils

class PythonRDDSuite extends SparkFunSuite with LocalSparkContext {

class PythonRDDSuite extends SparkFunSuite {
var tempDir: File = _

override def beforeAll(): Unit = {
super.beforeAll()
tempDir = Utils.createTempDir()
}

override def afterAll(): Unit = {
try {
Utils.deleteRecursively(tempDir)
} finally {
super.afterAll()
}
}

test("Writing large strings to the worker") {
val input: List[String] = List("a"*100000)
Expand Down Expand Up @@ -65,4 +91,59 @@ class PythonRDDSuite extends SparkFunSuite {
throw new Exception("exception within handleConnection")
}
}

test("mapToConf should not load defaults") {
val map = Map("key" -> "value")
val conf = PythonHadoopUtil.mapToConf(map.asJava)
assert(conf.size() === map.size)
assert(conf.get("key") === map("key"))
}

test("SparkContext's hadoop configuration should be respected in PythonRDD") {
// hadoop conf with default configurations
val hadoopConf = new Configuration()
assert(hadoopConf.size() > 0)
val headEntry = hadoopConf.asScala.head
val (firstKey, firstValue) = (headEntry.getKey, headEntry.getValue)

// passed to spark conf with a different value(prefixed by spark.)
val conf = new SparkConf().setAppName("test").setMaster("local")
conf.set("spark.hadoop." + firstKey, "spark." + firstValue)

sc = new SparkContext(conf)
val outDir = new File(tempDir, "output").getAbsolutePath
// write output as HadoopRDD's input
sc.makeRDD(1 to 1000, 10).saveAsTextFile(outDir)

val javaSparkContext = new JavaSparkContext(sc)
val confMap = new util.HashMap[String, String]()
// set input path in job conf
confMap.put(FileInputFormat.INPUT_DIR, outDir)

val pythonRDD = PythonRDD.hadoopRDD(
javaSparkContext,
classOf[TextInputFormat].getCanonicalName,
classOf[LongWritable].getCanonicalName,
classOf[Text].getCanonicalName,
null,
null,
confMap,
0
)

@tailrec
def getRootRDD(rdd: RDD[_]): RDD[_] = {
rdd.dependencies match {
case Nil => rdd
case dependency :: _ => getRootRDD(dependency.rdd)
}
}

// retrieve hadoopRDD as it's a root RDD
val hadoopRDD = getRootRDD(pythonRDD).asInstanceOf[HadoopRDD[_, _]]
val jobConf = hadoopRDD.getConf
// the jobConf passed to HadoopRDD should contain SparkContext's hadoop items rather the default
// configs in client's Configuration
assert(jobConf.get(firstKey) === "spark." + firstValue)
}
}