Skip to content

Commit 59b03e0

Browse files
committed
Fixes corresponding to Reynolds feedback comments
1 parent 277b4a3 commit 59b03e0

File tree

4 files changed

+43
-32
lines changed

4 files changed

+43
-32
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -385,14 +385,14 @@ class SparkContext(
385385
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
386386
* }}}
387387
*/
388-
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int,
389-
cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]
390-
): RDD[(K, V)] = {
388+
def hadoopFile[K, V, F <: InputFormat[K, V]]
389+
(path: String, minSplits: Int, cloneKeyValues: Boolean = true)
390+
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
391391
hadoopFile(path,
392-
fm.runtimeClass.asInstanceOf[Class[F]],
393-
km.runtimeClass.asInstanceOf[Class[K]],
394-
vm.runtimeClass.asInstanceOf[Class[V]],
395-
minSplits,
392+
fm.runtimeClass.asInstanceOf[Class[F]],
393+
km.runtimeClass.asInstanceOf[Class[K]],
394+
vm.runtimeClass.asInstanceOf[Class[V]],
395+
minSplits,
396396
cloneKeyValues = cloneKeyValues)
397397
}
398398

@@ -409,15 +409,15 @@ class SparkContext(
409409
hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues)
410410

411411
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
412-
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String,
413-
cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]
414-
): RDD[(K, V)] = {
412+
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
413+
(path: String, cloneKeyValues: Boolean = true)
414+
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
415415
newAPIHadoopFile(
416-
path,
417-
fm.runtimeClass.asInstanceOf[Class[F]],
418-
km.runtimeClass.asInstanceOf[Class[K]],
419-
vm.runtimeClass.asInstanceOf[Class[V]],
420-
cloneKeyValues = cloneKeyValues)
416+
path,
417+
fm.runtimeClass.asInstanceOf[Class[F]],
418+
km.runtimeClass.asInstanceOf[Class[K]],
419+
vm.runtimeClass.asInstanceOf[Class[V]],
420+
cloneKeyValues = cloneKeyValues)
421421
}
422422

423423
/**

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io.EOFException
2222
import scala.reflect.ClassTag
2323

2424
import org.apache.hadoop.conf.{Configuration, Configurable}
25+
import org.apache.hadoop.io.Writable
2526
import org.apache.hadoop.mapred.InputFormat
2627
import org.apache.hadoop.mapred.InputSplit
2728
import org.apache.hadoop.mapred.JobConf
@@ -91,7 +92,8 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
9192
inputFormatClass,
9293
keyClass,
9394
valueClass,
94-
minSplits, cloneKeyValues)
95+
minSplits,
96+
cloneKeyValues)
9597
}
9698

9799
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -162,10 +164,10 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
162164

163165
// Register an on-task-completion callback to close the input stream.
164166
context.addOnCompleteCallback{ () => closeIfNeeded() }
165-
166167
val key: K = reader.createKey()
168+
val keyCloneFunc = cloneWritables[K](getConf)
167169
val value: V = reader.createValue()
168-
170+
val valueCloneFunc = cloneWritables[V](getConf)
169171
override def getNext() = {
170172
try {
171173
finished = !reader.next(key, value)
@@ -174,7 +176,8 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
174176
finished = true
175177
}
176178
if (cloneKeyValues) {
177-
(cloneWritables(key, getConf), cloneWritables(value, getConf))
179+
(keyCloneFunc(key.asInstanceOf[Writable]),
180+
valueCloneFunc(value.asInstanceOf[Writable]))
178181
} else {
179182
(key, value)
180183
}

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
9292

9393
// Register an on-task-completion callback to close the input stream.
9494
context.addOnCompleteCallback(() => close())
95-
95+
val keyCloneFunc = cloneWritables[K](conf)
96+
val valueCloneFunc = cloneWritables[V](conf)
9697
var havePair = false
9798
var finished = false
9899

@@ -112,9 +113,11 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
112113
val key = reader.getCurrentKey
113114
val value = reader.getCurrentValue
114115
if (cloneKeyValues) {
115-
(cloneWritables(key, conf), cloneWritables(value, conf))
116-
} else
117-
(key, value)
116+
(keyCloneFunc(key.asInstanceOf[Writable]),
117+
valueCloneFunc(value.asInstanceOf[Writable]))
118+
} else {
119+
(key, value)
120+
}
118121
}
119122

120123
private def close() {

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,20 @@ private[spark] object Utils extends Logging {
5151
* intention is to optimize, for example for NullWritable there is no need and for Long, int and
5252
* String creating a new object with value set would be faster.
5353
*/
54-
def cloneWritables[T: ClassTag](obj: T, conf: Configuration): T = {
55-
val cloned = classTag[T] match {
56-
case ClassTag(_: Text) => new Text(obj.asInstanceOf[Text].getBytes)
57-
case ClassTag(_: LongWritable) => new LongWritable(obj.asInstanceOf[LongWritable].get)
58-
case ClassTag(_: IntWritable) => new IntWritable(obj.asInstanceOf[IntWritable].get)
59-
case ClassTag(_: NullWritable) => obj // TODO: should we clone this ?
60-
case _ => WritableUtils.clone(obj.asInstanceOf[Writable], conf) // slower way of cloning.
61-
}
62-
cloned.asInstanceOf[T]
54+
def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
55+
val cloneFunc = classTag[T] match {
56+
case ClassTag(_: Text) =>
57+
(w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
58+
case ClassTag(_: LongWritable) =>
59+
(w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
60+
case ClassTag(_: IntWritable) =>
61+
(w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
62+
case ClassTag(_: NullWritable) =>
63+
(w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
64+
case _ =>
65+
(w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
66+
}
67+
cloneFunc
6368
}
6469

6570
/** Serialize an object using Java serialization */

0 commit comments

Comments
 (0)