Skip to content

Commit eabda80

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2
2 parents 0f890e6 + b235e01 commit eabda80

File tree

39 files changed

+670
-106
lines changed

39 files changed

+670
-106
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ conf/*.cmd
2323
conf/*.properties
2424
conf/*.conf
2525
conf/*.xml
26+
conf/slaves
2627
docs/_site
2728
docs/api
2829
target/

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ log4j.properties
1919
log4j.properties.template
2020
metrics.properties.template
2121
slaves
22+
slaves.template
2223
spark-env.sh
2324
spark-env.cmd
2425
spark-env.sh.template
File renamed without changes.

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.spark
1919

2020
import java.io._
21+
import java.util.concurrent.ConcurrentHashMap
2122
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
2223

2324
import scala.collection.mutable.{HashSet, HashMap, Map}
2425
import scala.concurrent.Await
26+
import scala.collection.JavaConversions._
2527

2628
import akka.actor._
2729
import akka.pattern.ask
@@ -84,6 +86,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
8486
* On the master, it serves as the source of map outputs recorded from ShuffleMapTasks.
8587
* On the workers, it simply serves as a cache, in which a miss triggers a fetch from the
8688
* master's corresponding HashMap.
89+
*
90+
* Note: because mapStatuses is accessed concurrently, subclasses should make sure it's a
91+
* thread-safe map.
8792
*/
8893
protected val mapStatuses: Map[Int, Array[MapStatus]]
8994

@@ -339,7 +344,8 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
339344
* MapOutputTrackerMaster.
340345
*/
341346
private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) {
342-
protected val mapStatuses = new HashMap[Int, Array[MapStatus]]
347+
protected val mapStatuses: Map[Int, Array[MapStatus]] =
348+
new ConcurrentHashMap[Int, Array[MapStatus]]
343349
}
344350

345351
private[spark] object MapOutputTracker {

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
469469
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
470470
}
471471

472+
/**
473+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
474+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
475+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
476+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
477+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
478+
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
479+
*/
480+
def fullOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
481+
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
482+
val joinResult = rdd.fullOuterJoin(other, partitioner)
483+
fromRDD(joinResult.mapValues{ case (v, w) =>
484+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
485+
})
486+
}
487+
472488
/**
473489
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
474490
* partitioner/parallelism level.
@@ -563,6 +579,38 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
563579
fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
564580
}
565581

582+
/**
583+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
584+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
585+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
586+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
587+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
588+
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
589+
* parallelism level.
590+
*/
591+
def fullOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], Optional[W])] = {
592+
val joinResult = rdd.fullOuterJoin(other)
593+
fromRDD(joinResult.mapValues{ case (v, w) =>
594+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
595+
})
596+
}
597+
598+
/**
599+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
600+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
601+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
602+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
603+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
604+
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
605+
*/
606+
def fullOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int)
607+
: JavaPairRDD[K, (Optional[V], Optional[W])] = {
608+
val joinResult = rdd.fullOuterJoin(other, numPartitions)
609+
fromRDD(joinResult.mapValues{ case (v, w) =>
610+
(JavaUtils.optionToOptional(v), JavaUtils.optionToOptional(w))
611+
})
612+
}
613+
566614
/**
567615
* Return the key-value pairs in this RDD to the master as a Map.
568616
*/

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,23 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
506506
}
507507
}
508508

509+
/**
510+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
511+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
512+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
513+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
514+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
515+
* in `this` have key k. Uses the given Partitioner to partition the output RDD.
516+
*/
517+
def fullOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
518+
: RDD[(K, (Option[V], Option[W]))] = {
519+
this.cogroup(other, partitioner).flatMapValues {
520+
case (vs, Seq()) => vs.map(v => (Some(v), None))
521+
case (Seq(), ws) => ws.map(w => (None, Some(w)))
522+
case (vs, ws) => for (v <- vs; w <- ws) yield (Some(v), Some(w))
523+
}
524+
}
525+
509526
/**
510527
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
511528
* existing partitioner/parallelism level.
@@ -585,6 +602,31 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
585602
rightOuterJoin(other, new HashPartitioner(numPartitions))
586603
}
587604

605+
/**
606+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
607+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
608+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
609+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
610+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
611+
* in `this` have key k. Hash-partitions the resulting RDD using the existing partitioner/
612+
* parallelism level.
613+
*/
614+
def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))] = {
615+
fullOuterJoin(other, defaultPartitioner(self, other))
616+
}
617+
618+
/**
619+
* Perform a full outer join of `this` and `other`. For each element (k, v) in `this`, the
620+
* resulting RDD will either contain all pairs (k, (Some(v), Some(w))) for w in `other`, or
621+
* the pair (k, (Some(v), None)) if no elements in `other` have key k. Similarly, for each
622+
* element (k, w) in `other`, the resulting RDD will either contain all pairs
623+
* (k, (Some(v), Some(w))) for v in `this`, or the pair (k, (None, Some(w))) if no elements
624+
* in `this` have key k. Hash-partitions the resulting RDD into the given number of partitions.
625+
*/
626+
def fullOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], Option[W]))] = {
627+
fullOuterJoin(other, new HashPartitioner(numPartitions))
628+
}
629+
588630
/**
589631
* Return the key-value pairs in this RDD to the master as a Map.
590632
*

core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,13 @@ private[spark] class ShuffleMapTask(
6969
return writer.stop(success = true).get
7070
} catch {
7171
case e: Exception =>
72-
if (writer != null) {
73-
writer.stop(success = false)
72+
try {
73+
if (writer != null) {
74+
writer.stop(success = false)
75+
}
76+
} catch {
77+
case e: Exception =>
78+
log.debug("Could not stop writer", e)
7479
}
7580
throw e
7681
} finally {

core/src/test/scala/org/apache/spark/PartitioningSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,11 +193,13 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
193193
assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
194194
assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
195195
assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
196+
assert(grouped2.fullOuterJoin(grouped4).partitioner === grouped4.partitioner)
196197
assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
197198

198199
assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
199200
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
200201
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
202+
assert(grouped2.fullOuterJoin(reduced2).partitioner === grouped2.partitioner)
201203
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
202204

203205
assert(grouped2.map(_ => 1).partitioner === None)
@@ -218,6 +220,7 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
218220
assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
219221
assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
220222
assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
223+
assert(intercept[SparkException]{ arrPairs.fullOuterJoin(arrPairs) }.getMessage.contains("array"))
221224
assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
222225
assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
223226
assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))

core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,21 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
298298
))
299299
}
300300

301+
test("fullOuterJoin") {
302+
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
303+
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
304+
val joined = rdd1.fullOuterJoin(rdd2).collect()
305+
assert(joined.size === 6)
306+
assert(joined.toSet === Set(
307+
(1, (Some(1), Some('x'))),
308+
(1, (Some(2), Some('x'))),
309+
(2, (Some(1), Some('y'))),
310+
(2, (Some(1), Some('z'))),
311+
(3, (Some(1), None)),
312+
(4, (None, Some('w')))
313+
))
314+
}
315+
301316
test("join with no matches") {
302317
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
303318
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
193193
assert(rdd.join(emptyKv).collect().size === 0)
194194
assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
195195
assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
196+
assert(rdd.fullOuterJoin(emptyKv).collect().size === 2)
196197
assert(rdd.cogroup(emptyKv).collect().size === 2)
197198
assert(rdd.union(emptyKv).collect().size === 2)
198199
}

0 commit comments

Comments
 (0)