Skip to content

Commit 3b464b7

Browse files
committed
Set preferred locations for reduce tasks
This is another attempt at #1697 addressing some of the earlier concerns. This adds a couple of thresholds based on number map and reduce tasks beyond which we don't use preferred locations for reduce tasks. This patch also fixes some bugs in DAGSchedulerSuite where the MapStatus objects created didn't have the right number of reducers set.
1 parent 99bd500 commit 3b464b7

File tree

3 files changed

+118
-13
lines changed

3 files changed

+118
-13
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,12 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
237237
protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]()
238238
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]()
239239

240+
// For each shuffleId we also maintain a Map from reducerId -> (location, size)
241+
// Lazily populated whenever the statuses are requested from DAGScheduler
242+
private val statusByReducer =
243+
new TimeStampedHashMap[Int, HashMap[Int, Array[(BlockManagerId, Long)]]]()
244+
245+
240246
// For cleaning up TimeStampedHashMaps
241247
private val metadataCleaner =
242248
new MetadataCleaner(MetadataCleanerType.MAP_OUTPUT_TRACKER, this.cleanup, conf)
@@ -282,13 +288,37 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
282288
override def unregisterShuffle(shuffleId: Int) {
283289
mapStatuses.remove(shuffleId)
284290
cachedSerializedStatuses.remove(shuffleId)
291+
statusByReducer.remove(shuffleId)
285292
}
286293

287294
/** Check if the given shuffle is being tracked */
288295
def containsShuffle(shuffleId: Int): Boolean = {
289296
cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
290297
}
291298

299+
// Return the list of locations and blockSizes for each reducer.
300+
// The map is keyed by reducerId and for each reducer the value contains the array
301+
// of (location, size) of map outputs.
302+
//
303+
// This method is not thread-safe
304+
def getStatusByReducer(shuffleId: Int, numReducers: Int): Option[Map[Int, Array[(BlockManagerId, Long)]]] = {
305+
if (!statusByReducer.contains(shuffleId) && mapStatuses.contains(shuffleId)) {
306+
val statuses = mapStatuses(shuffleId)
307+
if (statuses.length > 0) {
308+
statusByReducer(shuffleId) = new HashMap[Int, Array[(BlockManagerId, Long)]]
309+
var r = 0
310+
while (r < numReducers) {
311+
val locs = statuses.map { s =>
312+
(s.location, s.getSizeForBlock(r))
313+
}
314+
statusByReducer(shuffleId) += (r -> locs)
315+
r = r + 1
316+
}
317+
}
318+
}
319+
statusByReducer.get(shuffleId)
320+
}
321+
292322
def incrementEpoch() {
293323
epochLock.synchronized {
294324
epoch += 1
@@ -336,6 +366,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
336366
private def cleanup(cleanupTime: Long) {
337367
mapStatuses.clearOldValues(cleanupTime)
338368
cachedSerializedStatuses.clearOldValues(cleanupTime)
369+
statusByReducer.clearOldValues(cleanupTime)
339370
}
340371
}
341372

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator
3939
import org.apache.spark.rdd.RDD
4040
import org.apache.spark.storage._
4141
import org.apache.spark.util._
42+
import org.apache.spark.util.collection.{Utils => CollectionUtils}
4243
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
4344

4445
/**
@@ -128,6 +129,15 @@ class DAGScheduler(
128129

129130
private val outputCommitCoordinator = env.outputCommitCoordinator
130131

132+
// Number of map, reduce tasks above which we do not assign preferred locations
133+
// based on map output sizes.
134+
private val SHUFFLE_PREF_MAP_THRESHOLD = 1000
135+
// NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
136+
private val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
137+
// Number of preferred locations to use for reducer tasks
138+
private[scheduler] val NUM_REDUCER_PREF_LOCS = 5
139+
140+
131141
// Called by TaskScheduler to report task's starting.
132142
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
133143
eventProcessLoop.post(BeginEvent(task, taskInfo))
@@ -1295,7 +1305,7 @@ class DAGScheduler(
12951305
{
12961306
// If the partition has already been visited, no need to re-visit.
12971307
// This avoids exponential path exploration. SPARK-695
1298-
if (!visited.add((rdd,partition))) {
1308+
if (!visited.add((rdd, partition))) {
12991309
// Nil has already been returned for previously visited partitions.
13001310
return Nil
13011311
}
@@ -1320,6 +1330,22 @@ class DAGScheduler(
13201330
return locs
13211331
}
13221332
}
1333+
case s: ShuffleDependency[_, _, _] =>
1334+
if (rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
1335+
s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
1336+
// Assign preferred locations for reducers by looking at map output location and sizes
1337+
val mapStatuses = mapOutputTracker.getStatusByReducer(s.shuffleId, rdd.partitions.size)
1338+
mapStatuses.map { status =>
1339+
// Get the map output locations for this reducer
1340+
if (status.contains(partition)) {
1341+
// Select first few locations as preferred locations for the reducer
1342+
val topLocs = CollectionUtils.takeOrdered(status(partition).iterator,
1343+
NUM_REDUCER_PREF_LOCS)(Ordering.by[(BlockManagerId, Long), Long](_._2).reverse).toSeq
1344+
return topLocs.map(_._1).map(loc => TaskLocation(loc.host, loc.executorId))
1345+
}
1346+
}
1347+
}
1348+
13231349
case _ =>
13241350
}
13251351
Nil

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -438,8 +438,8 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
438438
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
439439
submit(reduceRdd, Array(0, 1))
440440
complete(taskSets(0), Seq(
441-
(Success, makeMapStatus("hostA", 1)),
442-
(Success, makeMapStatus("hostB", 1))))
441+
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
442+
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
443443
// the 2nd ResultTask failed
444444
complete(taskSets(1), Seq(
445445
(Success, 42),
@@ -449,7 +449,7 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
449449
// ask the scheduler to try it again
450450
scheduler.resubmitFailedStages()
451451
// have the 2nd attempt pass
452-
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
452+
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
453453
// we can see both result blocks now
454454
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB"))
455455
complete(taskSets(3), Seq((Success, 43)))
@@ -464,8 +464,8 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
464464
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
465465
submit(reduceRdd, Array(0, 1))
466466
complete(taskSets(0), Seq(
467-
(Success, makeMapStatus("hostA", 1)),
468-
(Success, makeMapStatus("hostB", 1))))
467+
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
468+
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
469469
// The MapOutputTracker should know about both map output locations.
470470
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
471471
Array("hostA", "hostB"))
@@ -507,14 +507,18 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
507507
assert(newEpoch > oldEpoch)
508508
val taskSet = taskSets(0)
509509
// should be ignored for being too old
510-
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
510+
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
511+
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
511512
// should work because it's a non-failed host
512-
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
513+
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB",
514+
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
513515
// should be ignored for being too old
514-
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
516+
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
517+
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
515518
// should work because it's a new epoch
516519
taskSet.tasks(1).epoch = newEpoch
517-
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
520+
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
521+
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
518522
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
519523
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
520524
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -739,19 +743,63 @@ class DAGSchedulerSuite extends FunSuiteLike with BeforeAndAfter with LocalSpar
739743
assertDataStructuresEmpty
740744
}
741745

746+
test("shuffle with reducer locality") {
747+
// Create an shuffleMapRdd with 1 partition
748+
val shuffleMapRdd = new MyRDD(sc, 1, Nil)
749+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
750+
val shuffleId = shuffleDep.shuffleId
751+
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
752+
submit(reduceRdd, Array(0))
753+
complete(taskSets(0), Seq(
754+
(Success, makeMapStatus("hostA", 1))))
755+
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
756+
Array(makeBlockManagerId("hostA")))
757+
758+
// Reducer should run on the same host that map task ran
759+
val reduceTaskSet = taskSets(1)
760+
assertLocations(reduceTaskSet, Seq(Seq("hostA")))
761+
complete(reduceTaskSet, Seq((Success, 42)))
762+
assert(results === Map(0 -> 42))
763+
assertDataStructuresEmpty
764+
}
765+
766+
test("reducer locality with different sizes") {
767+
val numMapTasks = scheduler.NUM_REDUCER_PREF_LOCS + 1
768+
// Create an shuffleMapRdd with more partitions
769+
val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
770+
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
771+
val shuffleId = shuffleDep.shuffleId
772+
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
773+
submit(reduceRdd, Array(0))
774+
775+
val statuses = (1 to numMapTasks).map { i =>
776+
(Success, makeMapStatus("host" + i, 1, (10*i).toByte))
777+
}
778+
complete(taskSets(0), statuses)
779+
780+
// Reducer should prefer the last hosts where output size is larger
781+
val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1)
782+
783+
val reduceTaskSet = taskSets(1)
784+
assertLocations(reduceTaskSet, Seq(hosts))
785+
complete(reduceTaskSet, Seq((Success, 42)))
786+
assert(results === Map(0 -> 42))
787+
assertDataStructuresEmpty
788+
}
789+
742790
/**
743791
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
744792
* Note that this checks only the host and not the executor ID.
745793
*/
746794
private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
747795
assert(hosts.size === taskSet.tasks.size)
748796
for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
749-
assert(taskLocs.map(_.host) === expectedLocs)
797+
assert(taskLocs.map(_.host).toSet === expectedLocs.toSet)
750798
}
751799
}
752800

753-
private def makeMapStatus(host: String, reduces: Int): MapStatus =
754-
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2))
801+
private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
802+
MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
755803

756804
private def makeBlockManagerId(host: String): BlockManagerId =
757805
BlockManagerId("exec-" + host, host, 12345)

0 commit comments

Comments
 (0)