diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 6e4edc7c80d7..07ab98fe7f31 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -237,6 +237,12 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]() private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]() + // For each shuffleId we also maintain a Map from reducerId -> (location, size) + // Lazily populated whenever the statuses are requested from DAGScheduler + private val statusByReducer = + new TimeStampedHashMap[Int, HashMap[Int, Array[(BlockManagerId, Long)]]]() + + // For cleaning up TimeStampedHashMaps private val metadataCleaner = new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf) @@ -282,6 +288,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) override def unregisterShuffle(shuffleId: Int) { mapStatuses.remove(shuffleId) cachedSerializedStatuses.remove(shuffleId) + statusByReducer.remove(shuffleId) } /** Check if the given shuffle is being tracked */ @@ -289,6 +296,32 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId) } + // Return the list of locations and blockSizes for each reducer. + // The map is keyed by reducerId and for each reducer the value contains the array + // of (location, size) of map outputs. + // + // This method is not thread-safe + def getStatusByReducer( + shuffleId: Int, + numReducers: Int) + : Option[Map[Int, Array[(BlockManagerId, Long)]]] = { + if (!statusByReducer.contains(shuffleId) && mapStatuses.contains(shuffleId)) { + val statuses = mapStatuses(shuffleId) + if (statuses.length > 0) { + statusByReducer(shuffleId) = new HashMap[Int, Array[(BlockManagerId, Long)]] + var r = 0 + while (r < numReducers) { + val locs = statuses.map { s => + (s.location, s.getSizeForBlock(r)) + } + statusByReducer(shuffleId) += (r -> locs) + r = r + 1 + } + } + } + statusByReducer.get(shuffleId) + } + def incrementEpoch() { epochLock.synchronized { epoch += 1 @@ -336,6 +369,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) private def cleanup(cleanupTime: Long) { mapStatuses.clearOldValues(cleanupTime) cachedSerializedStatuses.clearOldValues(cleanupTime) + statusByReducer.clearOldValues(cleanupTime) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 79035571adb0..6716aae9be6d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -39,6 +39,7 @@ import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator import org.apache.spark.rdd.RDD import org.apache.spark.storage._ import org.apache.spark.util._ +import org.apache.spark.util.collection.{Utils => CollectionUtils} import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat /** @@ -128,6 +129,15 @@ class DAGScheduler( private val outputCommitCoordinator = env.outputCommitCoordinator + // Number of map, reduce tasks above which we do not assign preferred locations + // based on map output sizes. + private val SHUFFLE_PREF_MAP_THRESHOLD = 1000 + // NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that + private val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000 + // Number of preferred locations to use for reducer tasks + private[scheduler] val NUM_REDUCER_PREF_LOCS = 5 + + // Called by TaskScheduler to report task's starting. def taskStarted(task: Task[_], taskInfo: TaskInfo) { eventProcessLoop.post(BeginEvent(task, taskInfo)) @@ -1295,7 +1305,7 @@ class DAGScheduler( { // If the partition has already been visited, no need to re-visit. // This avoids exponential path exploration. SPARK-695 - if (!visited.add((rdd,partition))) { + if (!visited.add((rdd, partition))) { // Nil has already been returned for previously visited partitions. return Nil } @@ -1320,6 +1330,23 @@ class DAGScheduler( return locs } } + case s: ShuffleDependency[_, _, _] => + if (rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD && + s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) { + // Assign preferred locations for reducers by looking at map output location and sizes + val mapStatuses = mapOutputTracker.getStatusByReducer(s.shuffleId, rdd.partitions.size) + mapStatuses.map { status => + // Get the map output locations for this reducer + if (status.contains(partition)) { + // Select first few locations as preferred locations for the reducer + val topLocs = CollectionUtils.takeOrdered( + status(partition).iterator, NUM_REDUCER_PREF_LOCS)( + Ordering.by[(BlockManagerId, Long), Long](_._2).reverse).toSeq + return topLocs.map(_._1).map(loc => TaskLocation(loc.host, loc.executorId)) + } + } + } + case _ => } Nil diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9d0c1273695f..530562e5e966 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -438,8 +438,8 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)))) + (Success, makeMapStatus("hostA", reduceRdd.partitions.size)), + (Success, makeMapStatus("hostB", reduceRdd.partitions.size)))) // the 2nd ResultTask failed complete(taskSets(1), Seq( (Success, 42), @@ -449,7 +449,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar // ask the scheduler to try it again scheduler.resubmitFailedStages() // have the 2nd attempt pass - complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) + complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size)))) // we can see both result blocks now assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB")) complete(taskSets(3), Seq((Success, 43))) @@ -464,8 +464,8 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", 1)), - (Success, makeMapStatus("hostB", 1)))) + (Success, makeMapStatus("hostA", reduceRdd.partitions.size)), + (Success, makeMapStatus("hostB", reduceRdd.partitions.size)))) // The MapOutputTracker should know about both map output locations. assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB")) @@ -507,14 +507,18 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar assert(newEpoch > oldEpoch) val taskSet = taskSets(0) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", + reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) // should work because it's a non-failed host - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", + reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) // should be ignored for being too old - runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null)) + runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", + reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) // should work because it's a new epoch taskSet.tasks(1).epoch = newEpoch - runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null)) + runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", + reduceRdd.partitions.size), null, createFakeTaskInfo(), null)) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) complete(taskSets(1), Seq((Success, 42), (Success, 43))) @@ -739,6 +743,50 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar assertDataStructuresEmpty } + test("shuffle with reducer locality") { + // Create an shuffleMapRdd with 1 partition + val shuffleMapRdd = new MyRDD(sc, 1, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + submit(reduceRdd, Array(0)) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 1)))) + assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === + Array(makeBlockManagerId("hostA"))) + + // Reducer should run on the same host that map task ran + val reduceTaskSet = taskSets(1) + assertLocations(reduceTaskSet, Seq(Seq("hostA"))) + complete(reduceTaskSet, Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assertDataStructuresEmpty + } + + test("reducer locality with different sizes") { + val numMapTasks = scheduler.NUM_REDUCER_PREF_LOCS + 1 + // Create an shuffleMapRdd with more partitions + val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val shuffleId = shuffleDep.shuffleId + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) + submit(reduceRdd, Array(0)) + + val statuses = (1 to numMapTasks).map { i => + (Success, makeMapStatus("host" + i, 1, (10*i).toByte)) + } + complete(taskSets(0), statuses) + + // Reducer should prefer the last hosts where output size is larger + val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1) + + val reduceTaskSet = taskSets(1) + assertLocations(reduceTaskSet, Seq(hosts)) + complete(reduceTaskSet, Seq((Success, 42))) + assert(results === Map(0 -> 42)) + assertDataStructuresEmpty + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. @@ -746,12 +794,12 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) { assert(hosts.size === taskSet.tasks.size) for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) { - assert(taskLocs.map(_.host) === expectedLocs) + assert(taskLocs.map(_.host).toSet === expectedLocs.toSet) } } - private def makeMapStatus(host: String, reduces: Int): MapStatus = - MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2)) + private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus = + MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes)) private def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345)