Skip to content

Commit ee6e7f9

Browse files
committed
Merge pull request #359 from ScrapCodes/clone-writables
We clone hadoop key and values by default and reuse objects if asked to. We try to clone for most common types of writables and we call WritableUtils.clone otherwise intention is to optimize, for example for NullWritable there is no need and for Long, int and String creating a new object with value set would be faster than doing copy on object hopefully. There is another way to do this PR where we ask for both key and values whether to clone them or not, but could not think of a use case for it except either of them is actually a NullWritable for which I have already worked around. So thought that would be unnecessary.
2 parents 4216178 + 59b03e0 commit ee6e7f9

File tree

4 files changed

+106
-49
lines changed

4 files changed

+106
-49
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 44 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -349,25 +349,27 @@ class SparkContext(
349349
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
350350
* etc).
351351
*/
352-
def hadoopRDD[K, V](
352+
def hadoopRDD[K: ClassTag, V: ClassTag](
353353
conf: JobConf,
354354
inputFormatClass: Class[_ <: InputFormat[K, V]],
355355
keyClass: Class[K],
356356
valueClass: Class[V],
357-
minSplits: Int = defaultMinSplits
357+
minSplits: Int = defaultMinSplits,
358+
cloneKeyValues: Boolean = true
358359
): RDD[(K, V)] = {
359360
// Add necessary security credentials to the JobConf before broadcasting it.
360361
SparkHadoopUtil.get.addCredentials(conf)
361-
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
362+
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
362363
}
363364

364365
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */
365-
def hadoopFile[K, V](
366+
def hadoopFile[K: ClassTag, V: ClassTag](
366367
path: String,
367368
inputFormatClass: Class[_ <: InputFormat[K, V]],
368369
keyClass: Class[K],
369370
valueClass: Class[V],
370-
minSplits: Int = defaultMinSplits
371+
minSplits: Int = defaultMinSplits,
372+
cloneKeyValues: Boolean = true
371373
): RDD[(K, V)] = {
372374
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
373375
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -379,7 +381,8 @@ class SparkContext(
379381
inputFormatClass,
380382
keyClass,
381383
valueClass,
382-
minSplits)
384+
minSplits,
385+
cloneKeyValues)
383386
}
384387

385388
/**
@@ -390,14 +393,15 @@ class SparkContext(
390393
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
391394
* }}}
392395
*/
393-
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
394-
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
395-
: RDD[(K, V)] = {
396+
def hadoopFile[K, V, F <: InputFormat[K, V]]
397+
(path: String, minSplits: Int, cloneKeyValues: Boolean = true)
398+
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
396399
hadoopFile(path,
397-
fm.runtimeClass.asInstanceOf[Class[F]],
398-
km.runtimeClass.asInstanceOf[Class[K]],
399-
vm.runtimeClass.asInstanceOf[Class[V]],
400-
minSplits)
400+
fm.runtimeClass.asInstanceOf[Class[F]],
401+
km.runtimeClass.asInstanceOf[Class[K]],
402+
vm.runtimeClass.asInstanceOf[Class[V]],
403+
minSplits,
404+
cloneKeyValues = cloneKeyValues)
401405
}
402406

403407
/**
@@ -408,61 +412,67 @@ class SparkContext(
408412
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
409413
* }}}
410414
*/
411-
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
415+
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneKeyValues: Boolean = true)
412416
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
413-
hadoopFile[K, V, F](path, defaultMinSplits)
417+
hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues)
414418

415419
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
416-
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
420+
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
421+
(path: String, cloneKeyValues: Boolean = true)
417422
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
418423
newAPIHadoopFile(
419-
path,
420-
fm.runtimeClass.asInstanceOf[Class[F]],
421-
km.runtimeClass.asInstanceOf[Class[K]],
422-
vm.runtimeClass.asInstanceOf[Class[V]])
424+
path,
425+
fm.runtimeClass.asInstanceOf[Class[F]],
426+
km.runtimeClass.asInstanceOf[Class[K]],
427+
vm.runtimeClass.asInstanceOf[Class[V]],
428+
cloneKeyValues = cloneKeyValues)
423429
}
424430

425431
/**
426432
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
427433
* and extra configuration options to pass to the input format.
428434
*/
429-
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
435+
def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
430436
path: String,
431437
fClass: Class[F],
432438
kClass: Class[K],
433439
vClass: Class[V],
434-
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
440+
conf: Configuration = hadoopConfiguration,
441+
cloneKeyValues: Boolean = true): RDD[(K, V)] = {
435442
val job = new NewHadoopJob(conf)
436443
NewFileInputFormat.addInputPath(job, new Path(path))
437444
val updatedConf = job.getConfiguration
438-
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
445+
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneKeyValues)
439446
}
440447

441448
/**
442449
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
443450
* and extra configuration options to pass to the input format.
444451
*/
445-
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
452+
def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
446453
conf: Configuration = hadoopConfiguration,
447454
fClass: Class[F],
448455
kClass: Class[K],
449-
vClass: Class[V]): RDD[(K, V)] = {
450-
new NewHadoopRDD(this, fClass, kClass, vClass, conf)
456+
vClass: Class[V],
457+
cloneKeyValues: Boolean = true): RDD[(K, V)] = {
458+
new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneKeyValues)
451459
}
452460

453461
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
454-
def sequenceFile[K, V](path: String,
462+
def sequenceFile[K: ClassTag, V: ClassTag](path: String,
455463
keyClass: Class[K],
456464
valueClass: Class[V],
457-
minSplits: Int
465+
minSplits: Int,
466+
cloneKeyValues: Boolean = true
458467
): RDD[(K, V)] = {
459468
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
460-
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
469+
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
461470
}
462471

463472
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */
464-
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] =
465-
sequenceFile(path, keyClass, valueClass, defaultMinSplits)
473+
def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
474+
cloneKeyValues: Boolean = true): RDD[(K, V)] =
475+
sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneKeyValues)
466476

467477
/**
468478
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -480,16 +490,16 @@ class SparkContext(
480490
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
481491
* allow it to figure out the Writable class to use in the subclass case.
482492
*/
483-
def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
484-
(implicit km: ClassTag[K], vm: ClassTag[V],
493+
def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits,
494+
cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V],
485495
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
486496
: RDD[(K, V)] = {
487497
val kc = kcf()
488498
val vc = vcf()
489499
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
490500
val writables = hadoopFile(path, format,
491501
kc.writableClass(km).asInstanceOf[Class[Writable]],
492-
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
502+
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneKeyValues)
493503
writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
494504
}
495505

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ package org.apache.spark.rdd
1919

2020
import java.io.EOFException
2121

22-
import org.apache.hadoop.mapred.FileInputFormat
22+
import scala.reflect.ClassTag
23+
24+
import org.apache.hadoop.conf.{Configuration, Configurable}
25+
import org.apache.hadoop.io.Writable
2326
import org.apache.hadoop.mapred.InputFormat
2427
import org.apache.hadoop.mapred.InputSplit
2528
import org.apache.hadoop.mapred.JobConf
@@ -31,7 +34,7 @@ import org.apache.spark._
3134
import org.apache.spark.broadcast.Broadcast
3235
import org.apache.spark.deploy.SparkHadoopUtil
3336
import org.apache.spark.util.NextIterator
34-
import org.apache.hadoop.conf.{Configuration, Configurable}
37+
import org.apache.spark.util.Utils.cloneWritables
3538

3639

3740
/**
@@ -62,14 +65,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
6265
* @param valueClass Class of the value associated with the inputFormatClass.
6366
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
6467
*/
65-
class HadoopRDD[K, V](
68+
class HadoopRDD[K: ClassTag, V: ClassTag](
6669
sc: SparkContext,
6770
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
6871
initLocalJobConfFuncOpt: Option[JobConf => Unit],
6972
inputFormatClass: Class[_ <: InputFormat[K, V]],
7073
keyClass: Class[K],
7174
valueClass: Class[V],
72-
minSplits: Int)
75+
minSplits: Int,
76+
cloneKeyValues: Boolean)
7377
extends RDD[(K, V)](sc, Nil) with Logging {
7478

7579
def this(
@@ -78,7 +82,8 @@ class HadoopRDD[K, V](
7882
inputFormatClass: Class[_ <: InputFormat[K, V]],
7983
keyClass: Class[K],
8084
valueClass: Class[V],
81-
minSplits: Int) = {
85+
minSplits: Int,
86+
cloneKeyValues: Boolean) = {
8287
this(
8388
sc,
8489
sc.broadcast(new SerializableWritable(conf))
@@ -87,7 +92,8 @@ class HadoopRDD[K, V](
8792
inputFormatClass,
8893
keyClass,
8994
valueClass,
90-
minSplits)
95+
minSplits,
96+
cloneKeyValues)
9197
}
9298

9399
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -158,18 +164,23 @@ class HadoopRDD[K, V](
158164

159165
// Register an on-task-completion callback to close the input stream.
160166
context.addOnCompleteCallback{ () => closeIfNeeded() }
161-
162167
val key: K = reader.createKey()
168+
val keyCloneFunc = cloneWritables[K](getConf)
163169
val value: V = reader.createValue()
164-
170+
val valueCloneFunc = cloneWritables[V](getConf)
165171
override def getNext() = {
166172
try {
167173
finished = !reader.next(key, value)
168174
} catch {
169175
case eof: EOFException =>
170176
finished = true
171177
}
172-
(key, value)
178+
if (cloneKeyValues) {
179+
(keyCloneFunc(key.asInstanceOf[Writable]),
180+
valueCloneFunc(value.asInstanceOf[Writable]))
181+
} else {
182+
(key, value)
183+
}
173184
}
174185

175186
override def close() {

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ package org.apache.spark.rdd
2020
import java.text.SimpleDateFormat
2121
import java.util.Date
2222

23+
import scala.reflect.ClassTag
24+
2325
import org.apache.hadoop.conf.{Configurable, Configuration}
2426
import org.apache.hadoop.io.Writable
2527
import org.apache.hadoop.mapreduce._
2628

2729
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
30+
import org.apache.spark.util.Utils.cloneWritables
2831

2932

3033
private[spark]
@@ -36,12 +39,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
3639
override def hashCode(): Int = (41 * (41 + rddId) + index)
3740
}
3841

39-
class NewHadoopRDD[K, V](
42+
class NewHadoopRDD[K: ClassTag, V: ClassTag](
4043
sc : SparkContext,
4144
inputFormatClass: Class[_ <: InputFormat[K, V]],
4245
keyClass: Class[K],
4346
valueClass: Class[V],
44-
@transient conf: Configuration)
47+
@transient conf: Configuration,
48+
cloneKeyValues: Boolean)
4549
extends RDD[(K, V)](sc, Nil)
4650
with SparkHadoopMapReduceUtil
4751
with Logging {
@@ -88,7 +92,8 @@ class NewHadoopRDD[K, V](
8892

8993
// Register an on-task-completion callback to close the input stream.
9094
context.addOnCompleteCallback(() => close())
91-
95+
val keyCloneFunc = cloneWritables[K](conf)
96+
val valueCloneFunc = cloneWritables[V](conf)
9297
var havePair = false
9398
var finished = false
9499

@@ -105,7 +110,14 @@ class NewHadoopRDD[K, V](
105110
throw new java.util.NoSuchElementException("End of stream")
106111
}
107112
havePair = false
108-
(reader.getCurrentKey, reader.getCurrentValue)
113+
val key = reader.getCurrentKey
114+
val value = reader.getCurrentValue
115+
if (cloneKeyValues) {
116+
(keyCloneFunc(key.asInstanceOf[Writable]),
117+
valueCloneFunc(value.asInstanceOf[Writable]))
118+
} else {
119+
(key, value)
120+
}
109121
}
110122

111123
private def close() {

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,47 @@ import scala.collection.JavaConversions._
2626
import scala.collection.Map
2727
import scala.collection.mutable.ArrayBuffer
2828
import scala.io.Source
29-
import scala.reflect.ClassTag
29+
import scala.reflect.{classTag, ClassTag}
3030

3131
import com.google.common.io.Files
3232
import com.google.common.util.concurrent.ThreadFactoryBuilder
3333

34+
import org.apache.hadoop.conf.Configuration
3435
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
36+
import org.apache.hadoop.io._
3537

3638
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
3739
import org.apache.spark.deploy.SparkHadoopUtil
3840
import java.nio.ByteBuffer
39-
import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging}
41+
import org.apache.spark.{SparkConf, SparkException, Logging}
4042

4143

4244
/**
4345
* Various utility methods used by Spark.
4446
*/
4547
private[spark] object Utils extends Logging {
48+
49+
/**
50+
* We try to clone for most common types of writables and we call WritableUtils.clone otherwise
51+
* intention is to optimize, for example for NullWritable there is no need and for Long, int and
52+
* String creating a new object with value set would be faster.
53+
*/
54+
def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
55+
val cloneFunc = classTag[T] match {
56+
case ClassTag(_: Text) =>
57+
(w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
58+
case ClassTag(_: LongWritable) =>
59+
(w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
60+
case ClassTag(_: IntWritable) =>
61+
(w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
62+
case ClassTag(_: NullWritable) =>
63+
(w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
64+
case _ =>
65+
(w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
66+
}
67+
cloneFunc
68+
}
69+
4670
/** Serialize an object using Java serialization */
4771
def serialize[T](o: T): Array[Byte] = {
4872
val bos = new ByteArrayOutputStream()

0 commit comments

Comments
 (0)