@@ -216,17 +216,17 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
216216
217217 val reducePartition = (iter : Iterator [(K , V )]) => {
218218 val map = new JHashMap [K , V ]
219- iter.foreach { case (k, v) =>
220- val old = map.get(k )
221- map.put(k , if (old == null ) v else func(old, v ))
219+ iter.foreach { pair =>
220+ val old = map.get(pair._1 )
221+ map.put(pair._1 , if (old == null ) pair._2 else func(old, pair._2 ))
222222 }
223223 Iterator (map)
224224 } : Iterator [JHashMap [K , V ]]
225225
226226 val mergeMaps = (m1 : JHashMap [K , V ], m2 : JHashMap [K , V ]) => {
227- m2.foreach { case (k, v) =>
228- val old = m1.get(k )
229- m1.put(k , if (old == null ) v else func(old, v ))
227+ m2.foreach { pair =>
228+ val old = m1.get(pair._1 )
229+ m1.put(pair._1 , if (old == null ) pair._2 else func(old, pair._2 ))
230230 }
231231 m1
232232 } : JHashMap [K , V ]
@@ -401,9 +401,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
401401 * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
402402 */
403403 def join [W ](other : RDD [(K , W )], partitioner : Partitioner ): RDD [(K , (V , W ))] = {
404- this .cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
405- for (v <- vs ; w <- ws ) yield (v, w)
406- }
404+ this .cogroup(other, partitioner).flatMapValues( pair =>
405+ for (v <- pair._1 ; w <- pair._2 ) yield (v, w)
406+ )
407407 }
408408
409409 /**
@@ -413,11 +413,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
413413 * partition the output RDD.
414414 */
415415 def leftOuterJoin [W ](other : RDD [(K , W )], partitioner : Partitioner ): RDD [(K , (V , Option [W ]))] = {
416- this .cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
417- if (ws .isEmpty) {
418- vs .map(v => (v, None ))
416+ this .cogroup(other, partitioner).flatMapValues { pair =>
417+ if (pair._2 .isEmpty) {
418+ pair._1 .map(v => (v, None ))
419419 } else {
420- for (v <- vs ; w <- ws ) yield (v, Some (w))
420+ for (v <- pair._1 ; w <- pair._2 ) yield (v, Some (w))
421421 }
422422 }
423423 }
@@ -430,11 +430,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
430430 */
431431 def rightOuterJoin [W ](other : RDD [(K , W )], partitioner : Partitioner )
432432 : RDD [(K , (Option [V ], W ))] = {
433- this .cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
434- if (vs .isEmpty) {
435- ws .map(w => (None , w))
433+ this .cogroup(other, partitioner).flatMapValues { pair =>
434+ if (pair._1 .isEmpty) {
435+ pair._2 .map(w => (None , w))
436436 } else {
437- for (v <- vs ; w <- ws ) yield (Some (v), w)
437+ for (v <- pair._1 ; w <- pair._2 ) yield (Some (v), w)
438438 }
439439 }
440440 }
@@ -535,7 +535,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
535535 val data = self.collect()
536536 val map = new mutable.HashMap [K , V ]
537537 map.sizeHint(data.length)
538- data.foreach { case (k, v) => map.put(k, v ) }
538+ data.foreach { pair => map.put(pair._1, pair._2 ) }
539539 map
540540 }
541541
@@ -572,10 +572,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
572572 }
573573 val cg = new CoGroupedRDD [K ](Seq (self, other1, other2, other3), partitioner)
574574 cg.mapValues { case Seq (vs, w1s, w2s, w3s) =>
575- (vs.asInstanceOf [Seq [V ]],
576- w1s.asInstanceOf [Seq [W1 ]],
577- w2s.asInstanceOf [Seq [W2 ]],
578- w3s.asInstanceOf [Seq [W3 ]])
575+ (vs.asInstanceOf [Seq [V ]],
576+ w1s.asInstanceOf [Seq [W1 ]],
577+ w2s.asInstanceOf [Seq [W2 ]],
578+ w3s.asInstanceOf [Seq [W3 ]])
579579 }
580580 }
581581
@@ -589,8 +589,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
589589 throw new SparkException (" Default partitioner cannot partition array keys." )
590590 }
591591 val cg = new CoGroupedRDD [K ](Seq (self, other), partitioner)
592- cg.mapValues { case Seq (vs, ws ) =>
593- (vs.asInstanceOf [Seq [V ]], ws .asInstanceOf [Seq [W ]])
592+ cg.mapValues { case Seq (vs, w1s ) =>
593+ (vs.asInstanceOf [Seq [V ]], w1s .asInstanceOf [Seq [W ]])
594594 }
595595 }
596596
@@ -606,8 +606,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
606606 val cg = new CoGroupedRDD [K ](Seq (self, other1, other2), partitioner)
607607 cg.mapValues { case Seq (vs, w1s, w2s) =>
608608 (vs.asInstanceOf [Seq [V ]],
609- w1s.asInstanceOf [Seq [W1 ]],
610- w2s.asInstanceOf [Seq [W2 ]])
609+ w1s.asInstanceOf [Seq [W1 ]],
610+ w2s.asInstanceOf [Seq [W2 ]])
611611 }
612612 }
613613
@@ -712,8 +712,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
712712 val index = p.getPartition(key)
713713 val process = (it : Iterator [(K , V )]) => {
714714 val buf = new ArrayBuffer [V ]
715- for ((k, v) <- it if k == key) {
716- buf += v
715+ for (pair <- it if pair._1 == key) {
716+ buf += pair._2
717717 }
718718 buf
719719 } : Seq [V ]
@@ -858,8 +858,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
858858 val writer = format.getRecordWriter(hadoopContext).asInstanceOf [NewRecordWriter [K ,V ]]
859859 try {
860860 while (iter.hasNext) {
861- val (k, v) = iter.next()
862- writer.write(k, v )
861+ val pair = iter.next()
862+ writer.write(pair._1, pair._2 )
863863 }
864864 } finally {
865865 writer.close(hadoopContext)
0 commit comments