Skip to content

Commit be10f8a

Browse files
committed
SPARK-2519 part 2. Remove pattern matching on Tuple2 in critical sections of CoGroupedRDD and PairRDDFunctions
1 parent 30b8d36 commit be10f8a

File tree

1 file changed

+25
-36
lines changed

1 file changed

+25
-36
lines changed

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 25 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
216216

217217
val reducePartition = (iter: Iterator[(K, V)]) => {
218218
val map = new JHashMap[K, V]
219-
iter.foreach { case (k, v) =>
220-
val old = map.get(k)
221-
map.put(k, if (old == null) v else func(old, v))
219+
iter.foreach { pair =>
220+
val old = map.get(pair._1)
221+
map.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
222222
}
223223
Iterator(map)
224224
} : Iterator[JHashMap[K, V]]
225225

226226
val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
227-
m2.foreach { case (k, v) =>
228-
val old = m1.get(k)
229-
m1.put(k, if (old == null) v else func(old, v))
227+
m2.foreach { pair =>
228+
val old = m1.get(pair._1)
229+
m1.put(pair._1, if (old == null) pair._2 else func(old, pair._2))
230230
}
231231
m1
232232
} : JHashMap[K, V]
@@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
401401
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
402402
*/
403403
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
404-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
405-
for (v <- vs; w <- ws) yield (v, w)
406-
}
404+
this.cogroup(other, partitioner).flatMapValues( pair =>
405+
for (v <- pair._1; w <- pair._2) yield (v, w)
406+
)
407407
}
408408

409409
/**
@@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
413413
* partition the output RDD.
414414
*/
415415
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
416-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
417-
if (ws.isEmpty) {
418-
vs.map(v => (v, None))
416+
this.cogroup(other, partitioner).flatMapValues { pair =>
417+
if (pair._2.isEmpty) {
418+
pair._1.map(v => (v, None))
419419
} else {
420-
for (v <- vs; w <- ws) yield (v, Some(w))
420+
for (v <- pair._1; w <- pair._2) yield (v, Some(w))
421421
}
422422
}
423423
}
@@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
430430
*/
431431
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
432432
: RDD[(K, (Option[V], W))] = {
433-
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
434-
if (vs.isEmpty) {
435-
ws.map(w => (None, w))
433+
this.cogroup(other, partitioner).flatMapValues { pair =>
434+
if (pair._1.isEmpty) {
435+
pair._2.map(w => (None, w))
436436
} else {
437-
for (v <- vs; w <- ws) yield (Some(v), w)
437+
for (v <- pair._1; w <- pair._2) yield (Some(v), w)
438438
}
439439
}
440440
}
@@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
535535
val data = self.collect()
536536
val map = new mutable.HashMap[K, V]
537537
map.sizeHint(data.length)
538-
data.foreach { case (k, v) => map.put(k, v) }
538+
data.foreach { pair => map.put(pair._1, pair._2) }
539539
map
540540
}
541541

@@ -571,12 +571,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
571571
throw new SparkException("Default partitioner cannot partition array keys.")
572572
}
573573
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
574-
cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
575-
(vs.asInstanceOf[Seq[V]],
576-
w1s.asInstanceOf[Seq[W1]],
577-
w2s.asInstanceOf[Seq[W2]],
578-
w3s.asInstanceOf[Seq[W3]])
579-
}
574+
cg.mapValues { seq => seq.asInstanceOf[(Seq[V], Seq[W1], Seq[W2], Seq[W3])] }
580575
}
581576

582577
/**
@@ -589,9 +584,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
589584
throw new SparkException("Default partitioner cannot partition array keys.")
590585
}
591586
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
592-
cg.mapValues { case Seq(vs, ws) =>
593-
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
594-
}
587+
cg.mapValues { pair => pair.asInstanceOf[(Seq[V], Seq[W])] }
595588
}
596589

597590
/**
@@ -604,11 +597,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
604597
throw new SparkException("Default partitioner cannot partition array keys.")
605598
}
606599
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
607-
cg.mapValues { case Seq(vs, w1s, w2s) =>
608-
(vs.asInstanceOf[Seq[V]],
609-
w1s.asInstanceOf[Seq[W1]],
610-
w2s.asInstanceOf[Seq[W2]])
611-
}
600+
cg.mapValues { seq => seq.asInstanceOf[(Seq[V], Seq[W1], Seq[W2])] }
612601
}
613602

614603
/**
@@ -712,8 +701,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
712701
val index = p.getPartition(key)
713702
val process = (it: Iterator[(K, V)]) => {
714703
val buf = new ArrayBuffer[V]
715-
for ((k, v) <- it if k == key) {
716-
buf += v
704+
for (pair <- it if pair._1 == key) {
705+
buf += pair._2
717706
}
718707
buf
719708
} : Seq[V]
@@ -858,8 +847,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
858847
val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
859848
try {
860849
while (iter.hasNext) {
861-
val (k, v) = iter.next()
862-
writer.write(k, v)
850+
val pair = iter.next()
851+
writer.write(pair._1, pair._2)
863852
}
864853
} finally {
865854
writer.close(hadoopContext)

0 commit comments

Comments
 (0)