Skip to content

Commit 1a4a1d6

Browse files
committed
Address @mateiz style comments
1 parent 01e0813 commit 1a4a1d6

File tree

3 files changed

+28
-25
lines changed

3 files changed

+28
-25
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,13 @@ import org.apache.hadoop.conf.Configuration
2323
import org.apache.hadoop.io._
2424

2525

26-
/**
27-
* Utilities for working with Python objects -> Hadoop-related objects
28-
*/
26+
/** Utilities for working with Python objects -> Hadoop-related objects */
2927
private[python] object PythonHadoopUtil {
3028

3129
/**
32-
* Convert a Map of properties to a [[org.apache.hadoop.conf.Configuration]]
30+
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
3331
*/
34-
def mapToConf(map: java.util.Map[String, String]) = {
32+
def mapToConf(map: java.util.Map[String, String]): Configuration = {
3533
import collection.JavaConversions._
3634
val conf = new Configuration()
3735
map.foreach{ case (k, v) => conf.set(k, v) }
@@ -42,7 +40,7 @@ private[python] object PythonHadoopUtil {
4240
* Merges two configurations, returns a copy of left with keys from right overwriting
4341
* any matching keys in left
4442
*/
45-
def mergeConfs(left: Configuration, right: Configuration) = {
43+
def mergeConfs(left: Configuration, right: Configuration): Configuration = {
4644
import collection.JavaConversions._
4745
val copy = new Configuration(left)
4846
right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue))

core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,8 @@ import scala.util.Success
2424
import scala.util.Failure
2525
import net.razorvine.pickle.Pickler
2626

27-
/**
28-
* Utilities for serialization / deserialization between Python and Java, using MsgPack.
29-
* Also contains utilities for converting [[org.apache.hadoop.io.Writable]] ->
30-
* Scala objects and primitives
31-
*/
27+
28+
/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
3229
private[python] object SerDeUtil extends Logging {
3330

3431
/**
@@ -37,12 +34,12 @@ private[python] object SerDeUtil extends Logging {
3734
* representation is serialized
3835
*/
3936
def rddToPython[K, V](rdd: RDD[(K, V)]): RDD[Array[Byte]] = {
40-
rdd.mapPartitions{ iter =>
37+
rdd.mapPartitions { iter =>
4138
val pickle = new Pickler
4239
var keyFailed = false
4340
var valueFailed = false
4441
var firstRecord = true
45-
iter.map{ case (k, v) =>
42+
iter.map { case (k, v) =>
4643
if (firstRecord) {
4744
Try {
4845
pickle.dumps(Array(k, v))
@@ -57,29 +54,32 @@ private[python] object SerDeUtil extends Logging {
5754
}
5855
(kt, vt) match {
5956
case (Failure(kf), Failure(vf)) =>
60-
log.warn(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName};
57+
logWarning(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName};
6158
Error: ${kf.getMessage}""")
62-
log.warn(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName};
59+
logWarning(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName};
6360
Error: ${vf.getMessage}""")
6461
keyFailed = true
6562
valueFailed = true
6663
case (Failure(kf), _) =>
67-
log.warn(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName};
64+
logWarning(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName};
6865
Error: ${kf.getMessage}""")
6966
keyFailed = true
7067
case (_, Failure(vf)) =>
71-
log.warn(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName};
68+
logWarning(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName};
7269
Error: ${vf.getMessage}""")
7370
valueFailed = true
7471
}
7572
}
7673
firstRecord = false
7774
}
78-
(keyFailed, valueFailed) match {
79-
case (true, true) => pickle.dumps(Array(k.toString, v.toString))
80-
case (true, false) => pickle.dumps(Array(k.toString, v))
81-
case (false, true) => pickle.dumps(Array(k, v.toString))
82-
case (false, false) => pickle.dumps(Array(k, v))
75+
if (keyFailed && valueFailed) {
76+
pickle.dumps(Array(k.toString, v.toString))
77+
} else if (keyFailed) {
78+
pickle.dumps(Array(k.toString, v))
79+
} else if (!keyFailed && valueFailed) {
80+
pickle.dumps(Array(k, v.toString))
81+
} else {
82+
pickle.dumps(Array(k, v))
8383
}
8484
}
8585
}

core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,15 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten
5858
* This object contains method to generate SequenceFile test data and write it to a
5959
* given directory (probably a temp directory)
6060
*/
61-
object WriteInputFormatTestDataGenerator extends App {
61+
object WriteInputFormatTestDataGenerator {
6262
import SparkContext._
6363

64+
def main(args: Array[String]) {
65+
val path = args(0)
66+
val sc = new JavaSparkContext("local[4]", "test-writables")
67+
generateData(path, sc)
68+
}
69+
6470
def generateData(path: String, jsc: JavaSparkContext) {
6571
val sc = jsc.sc
6672

@@ -99,8 +105,7 @@ object WriteInputFormatTestDataGenerator extends App {
99105
sc.parallelize(data, numSlices = 2)
100106
.map{ case (k, v) =>
101107
(new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
102-
}
103-
.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
108+
}.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)
104109

105110
// Create test data for MapWritable, with keys DoubleWritable and values Text
106111
val mapData = Seq(

0 commit comments

Comments
 (0)